You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/09 22:09:40 UTC
[iceberg] branch master updated: Spark 2.4 - 3.X: Backport Fix CommitStateUnknown Handling (#4687) (#4719)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 56933c179 Spark 2.4 - 3.X: Backport Fix CommitStateUnknown Handling (#4687) (#4719)
56933c179 is described below
commit 56933c179aedcfd3433f599111099785d2abb575
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Mon May 9 17:09:34 2022 -0500
Spark 2.4 - 3.X: Backport Fix CommitStateUnknown Handling (#4687) (#4719)
---
.../org/apache/iceberg/spark/source/Writer.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 99 ++++++++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 61 +++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 74 ++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 62 ++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 74 ++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 62 ++++++++++++++
9 files changed, 516 insertions(+), 48 deletions(-)
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 451487ecf..c4259a9bf 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.UnpartitionedWriter;
@@ -78,6 +79,8 @@ class Writer implements DataSourceWriter {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
+ private boolean cleanupOnAbort = true;
+
Writer(SparkSession spark, Table table, SparkWriteConf writeConf, boolean replacePartitions,
String applicationId, Schema writeSchema, StructType dsSchema) {
this(spark, table, writeConf, replacePartitions, applicationId, null, writeSchema, dsSchema);
@@ -136,10 +139,15 @@ class Writer implements DataSourceWriter {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void append(WriterCommitMessage[] messages) {
@@ -175,18 +183,22 @@ class Writer implements DataSourceWriter {
@Override
public void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
protected Table table() {
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..158d57473
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+public class ManualSource implements WriteSupport, DataSourceRegister, DataSourceV2 {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "table_name";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ private SparkSession lazySpark = null;
+ private Configuration lazyConf = null;
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType dsStruct, SaveMode mode,
+ DataSourceOptions options) {
+
+ Map<String, String> properties = options.asMap();
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ Table table = tableMap.get(tableName);
+
+ SparkWriteConf writeConf = new SparkWriteConf(lazySparkSession(), table, options.asMap());
+ Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
+ TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+ SparkUtil.validatePartitionTransforms(table.spec());
+ String appId = lazySparkSession().sparkContext().applicationId();
+ String wapId = writeConf.wapId();
+ boolean replacePartitions = mode == SaveMode.Overwrite;
+
+ return Optional.of(new Writer(
+ lazySparkSession(), table, writeConf, replacePartitions, appId, wapId, writeSchema, dsStruct));
+ }
+
+ private SparkSession lazySparkSession() {
+ if (lazySpark == null) {
+ this.lazySpark = SparkSession.builder().getOrCreate();
+ }
+ return lazySpark;
+ }
+
+ private Configuration lazyBaseConf() {
+ if (lazyConf == null) {
+ this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
+ }
+ return lazyConf;
+ }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..e7a6d1623 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,55 @@ public class TestSparkDataWrite {
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, spyTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 934a03490..80e6de96a 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -105,6 +106,8 @@ class SparkWrite {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
+ private boolean cleanupOnAbort = true;
+
SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
LogicalWriteInfo writeInfo, String applicationId,
Schema writeSchema, StructType dsSchema) {
@@ -178,25 +181,34 @@ class SparkWrite {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "TABLE_NAME";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).schema();
+ }
+
+ @Override
+ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).partitioning();
+ }
+
+ @Override
+ public org.apache.spark.sql.connector.catalog.Table getTable(
+ StructType schema, Transform[] partitioning, Map<String, String> properties) {
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public boolean supportsExternalMetadata() {
+ return false;
+ }
+}
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..8bf57ba65 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,56 @@ public class TestSparkDataWrite {
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+ SparkTable sparkTable = new SparkTable(spyTable, false);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, sparkTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 934a03490..80e6de96a 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -105,6 +106,8 @@ class SparkWrite {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
+ private boolean cleanupOnAbort = true;
+
SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
LogicalWriteInfo writeInfo, String applicationId,
Schema writeSchema, StructType dsSchema) {
@@ -178,25 +181,34 @@ class SparkWrite {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "TABLE_NAME";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).schema();
+ }
+
+ @Override
+ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).partitioning();
+ }
+
+ @Override
+ public org.apache.spark.sql.connector.catalog.Table getTable(
+ StructType schema, Transform[] partitioning, Map<String, String> properties) {
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public boolean supportsExternalMetadata() {
+ return false;
+ }
+}
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..8bf57ba65 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,56 @@ public class TestSparkDataWrite {
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+ SparkTable sparkTable = new SparkTable(spyTable, false);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, sparkTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,