You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/03 22:11:15 UTC
[GitHub] [iceberg] RussellSpitzer opened a new issue, #4686: Spark: CommitFailedException in Spark Commit Operation Can Lead to Data Loss
RussellSpitzer opened a new issue, #4686:
URL: https://github.com/apache/iceberg/issues/4686
The issue here was pointed out by @stevenzwu when we were discussing the issue with incorrect aborts when non-runtime exceptions were thrown. He correctly noticed that the same sort of issue would appear if we threw a commit unknown exception and the Spark Writer aborted and deleted underlying data files.
cc : @stevenzwu @rdblue @aokolnychyi @flyrain
## Reproduction
I tried for a while to see if I could mock this but I was unable to, I'm trying
Modify https://github.com/apache/iceberg/blob/f6e11148d31b408a7aea57a0efcb4428134f6a99/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L181
And add as the last line of this method
```java
throw new CommitFailedException("Fun failure");
```
This means that our metadata will be properly commit and data files written.
Following this I added a small test
```java
@Test
public void testCommitUnknownException() throws IOException {
File parent = temp.newFolder(format.toString());
File location = new File(parent, "commitunknown");
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
Table table = tables.create(SCHEMA, spec, location.toString());
List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
for (int i = 0; i < 2000; i++) {
expected.add(new SimpleRecord(i, "a"));
expected.add(new SimpleRecord(i, "b"));
expected.add(new SimpleRecord(i, "c"));
expected.add(new SimpleRecord(i, "d"));
}
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
SparkTable sparkTable = spy(new SparkTable(table, true));
try {
df.select("id", "data").sort("data").write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
} catch (Throwable t) {
System.out.println("Error Expected");
}
long count = spark.read().format("iceberg").load(location.toString()).count();
Assert.assertEquals(count, 8000);
}
```
This test will fail because of missing data files
```java
[Executor task launch worker for task 0.0 in stage 4.0 (TID 5)] ERROR org.apache.iceberg.spark.source.BaseDataReader - Error reading file: /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for file: /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:161)
at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:195)
at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:54)
at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218)
at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74)
at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:77)
at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:107)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.FileNotFoundException: File /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:159)
... 34 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on issue #4686: Spark: CommitFailedException in Spark Commit Operation Can Lead to Data Loss
Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on issue #4686:
URL: https://github.com/apache/iceberg/issues/4686#issuecomment-1116721402
Sorry incorrect exception there should have been CommitStateUnknown, the exception is not swallowed. The issue is that the operation.commit code in SnapshotProducer can throw CommitStateUnknown which will be surfaced up to Spark. This is explicitly rethrown see
https://github.com/apache/iceberg/blob/674dcf1c2e3cb0444a183c55a24b521e297880de/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L321
Once rethrown into the SparkWrite commitOperation code it will trigger the data source Abort method which will delete the data files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer closed issue #4686: Spark: CommitStateUnknown in Spark Commit Operation Can Lead to Data Loss
Posted by GitBox <gi...@apache.org>.
RussellSpitzer closed issue #4686: Spark: CommitStateUnknown in Spark Commit Operation Can Lead to Data Loss
URL: https://github.com/apache/iceberg/issues/4686
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on issue #4686: Spark: CommitStateUnknown in Spark Commit Operation Can Lead to Data Loss
Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #4686:
URL: https://github.com/apache/iceberg/issues/4686#issuecomment-1116733218
Got it. The `SparkWrite` aborts and deletes data files due to the exception, but the table operation commit may succeed(in a unknown state).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] flyrain commented on issue #4686: Spark: CommitFailedException in Spark Commit Operation Can Lead to Data Loss
Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #4686:
URL: https://github.com/apache/iceberg/issues/4686#issuecomment-1116717358
```
throw new CommitFailedException("Fun failure");
```
Is this runtime exception swallowed somehow?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org