You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/17 16:49:30 UTC
[iceberg] branch 0.13.x updated: Spark: Update commit state unknown handling (backport to 0.13) (#4787)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new 304fc89d8 Spark: Update commit state unknown handling (backport to 0.13) (#4787)
304fc89d8 is described below
commit 304fc89d833e084dd59edb7f0aaa431199c65cb7
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 17 18:49:25 2022 +0200
Spark: Update commit state unknown handling (backport to 0.13) (#4787)
Co-authored-by: Russell Spitzer <rs...@apple.com>
---
.../java/org/apache/iceberg/PendingUpdate.java | 3 +
.../org/apache/iceberg/spark/source/Writer.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 99 ++++++++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 61 +++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 74 ++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 62 ++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 74 ++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 62 ++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 44 ++++++----
.../apache/iceberg/spark/source/ManualSource.java | 74 ++++++++++++++++
.../iceberg/spark/source/TestSparkDataWrite.java | 62 ++++++++++++++
13 files changed, 683 insertions(+), 64 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/PendingUpdate.java b/api/src/main/java/org/apache/iceberg/PendingUpdate.java
index 8e47dd006..9c1b18434 100644
--- a/api/src/main/java/org/apache/iceberg/PendingUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/PendingUpdate.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
/**
@@ -49,6 +50,8 @@ public interface PendingUpdate<T> {
*
* @throws ValidationException If the update cannot be applied to the current table metadata.
* @throws CommitFailedException If the update cannot be committed due to conflicts.
+ * @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup should be done in
+ * this case.
*/
void commit();
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 451487ecf..c4259a9bf 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.UnpartitionedWriter;
@@ -78,6 +79,8 @@ class Writer implements DataSourceWriter {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
+ private boolean cleanupOnAbort = true;
+
Writer(SparkSession spark, Table table, SparkWriteConf writeConf, boolean replacePartitions,
String applicationId, Schema writeSchema, StructType dsSchema) {
this(spark, table, writeConf, replacePartitions, applicationId, null, writeSchema, dsSchema);
@@ -136,10 +139,15 @@ class Writer implements DataSourceWriter {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void append(WriterCommitMessage[] messages) {
@@ -175,18 +183,22 @@ class Writer implements DataSourceWriter {
@Override
public void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
protected Table table() {
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..158d57473
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+public class ManualSource implements WriteSupport, DataSourceRegister, DataSourceV2 {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "table_name";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ private SparkSession lazySpark = null;
+ private Configuration lazyConf = null;
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType dsStruct, SaveMode mode,
+ DataSourceOptions options) {
+
+ Map<String, String> properties = options.asMap();
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ Table table = tableMap.get(tableName);
+
+ SparkWriteConf writeConf = new SparkWriteConf(lazySparkSession(), table, options.asMap());
+ Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
+ TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+ SparkUtil.validatePartitionTransforms(table.spec());
+ String appId = lazySparkSession().sparkContext().applicationId();
+ String wapId = writeConf.wapId();
+ boolean replacePartitions = mode == SaveMode.Overwrite;
+
+ return Optional.of(new Writer(
+ lazySparkSession(), table, writeConf, replacePartitions, appId, wapId, writeSchema, dsStruct));
+ }
+
+ private SparkSession lazySparkSession() {
+ if (lazySpark == null) {
+ this.lazySpark = SparkSession.builder().getOrCreate();
+ }
+ return lazySpark;
+ }
+
+ private Configuration lazyBaseConf() {
+ if (lazyConf == null) {
+ this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
+ }
+ return lazyConf;
+ }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..6d1d9df63 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,55 @@ public class TestSparkDataWrite {
}
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, spyTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 24475899e..87fcd9cad 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -103,6 +104,8 @@ class SparkWrite {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
+ private boolean cleanupOnAbort = true;
+
SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
LogicalWriteInfo writeInfo, String applicationId,
Schema writeSchema, StructType dsSchema) {
@@ -176,25 +179,34 @@ class SparkWrite {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "TABLE_NAME";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).schema();
+ }
+
+ @Override
+ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).partitioning();
+ }
+
+ @Override
+ public org.apache.spark.sql.connector.catalog.Table getTable(
+ StructType schema, Transform[] partitioning, Map<String, String> properties) {
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public boolean supportsExternalMetadata() {
+ return false;
+ }
+}
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..0082394fb 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,56 @@ public class TestSparkDataWrite {
}
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+ SparkTable sparkTable = new SparkTable(spyTable, false);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, sparkTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 24475899e..87fcd9cad 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -103,6 +104,8 @@ class SparkWrite {
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
+ private boolean cleanupOnAbort = true;
+
SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
LogicalWriteInfo writeInfo, String applicationId,
Schema writeSchema, StructType dsSchema) {
@@ -176,25 +179,34 @@ class SparkWrite {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "TABLE_NAME";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).schema();
+ }
+
+ @Override
+ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).partitioning();
+ }
+
+ @Override
+ public org.apache.spark.sql.connector.catalog.Table getTable(
+ StructType schema, Transform[] partitioning, Map<String, String> properties) {
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public boolean supportsExternalMetadata() {
+ return false;
+ }
+}
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..0082394fb 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,56 @@ public class TestSparkDataWrite {
}
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+ SparkTable sparkTable = new SparkTable(spyTable, false);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, sparkTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 1504ba9a8..e1f0ae9fd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -109,6 +110,8 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;
+ private boolean cleanupOnAbort = true;
+
SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
LogicalWriteInfo writeInfo, String applicationId,
Schema writeSchema, StructType dsSchema,
@@ -191,25 +194,34 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
private void abort(WriterCommitMessage[] messages) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- table.io().deleteFile(file.path().toString());
- });
+ if (cleanupOnAbort) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ table.io().deleteFile(file.path().toString());
+ });
+ } else {
+ LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+ }
}
private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+ public static final String SHORT_NAME = "manual_source";
+ public static final String TABLE_NAME = "TABLE_NAME";
+ private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+ public static void setTable(String name, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+ tableMap.put(name, table);
+ }
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
+
+ @Override
+ public String shortName() {
+ return SHORT_NAME;
+ }
+
+ @Override
+ public StructType inferSchema(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).schema();
+ }
+
+ @Override
+ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+ return getTable(null, null, options).partitioning();
+ }
+
+ @Override
+ public org.apache.spark.sql.connector.catalog.Table getTable(
+ StructType schema, Transform[] partitioning, Map<String, String> properties) {
+ Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+ String tableName = properties.get(TABLE_NAME);
+ Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+ return tableMap.get(tableName);
+ }
+
+ @Override
+ public boolean supportsExternalMetadata() {
+ return false;
+ }
+}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..0082394fb 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
}
+ @Parameterized.AfterParam
+ public static void clearSourceCache() {
+ ManualSource.clearTables();
+ }
+
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,56 @@ public class TestSparkDataWrite {
}
}
+ @Test
+ public void testCommitUnknownException() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "commitunknown");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+ AppendFiles append = table.newFastAppend();
+ AppendFiles spyAppend = spy(append);
+ doAnswer(invocation -> {
+ append.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyAppend).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newAppend()).thenReturn(spyAppend);
+ SparkTable sparkTable = new SparkTable(spyTable, false);
+
+ String manualTableName = "unknown_exception";
+ ManualSource.setTable(manualTableName, sparkTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> df.select("id", "data").sort("data").write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(location.toString()));
+
+ // Since write and commit succeeded, the rows should be readable
+ Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+ Assert.assertEquals("Result rows should match", records, actual);
+ }
+
public enum IcebergOptionsType {
NONE,
TABLE,