You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/03 22:11:15 UTC

[GitHub] [iceberg] RussellSpitzer opened a new issue, #4686: Spark: CommitFailedException in Spark Commit Operation Can Lead to Data Loss

RussellSpitzer opened a new issue, #4686:
URL: https://github.com/apache/iceberg/issues/4686

   The issue here was pointed out by @stevenzwu when we were discussing the issue with incorrect aborts when non-runtime exceptions were thrown. He correctly noticed that the same sort of issue would appear if we threw a commit unknown exception and the Spark Writer aborted and deleted underlying data files.
   
   cc : @stevenzwu @rdblue @aokolnychyi @flyrain
   
   ## Reproduction
   
   I tried for a while to see if I could mock this but I was unable to, I'm trying 
   
   Modify https://github.com/apache/iceberg/blob/f6e11148d31b408a7aea57a0efcb4428134f6a99/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L181
   
   And add as the last line of this method
   ```java
       throw new CommitFailedException("Fun failure");
   ```
   
   This means that our metadata will be properly commit and data files written.
   
   Following this I added a small test
   
   ```java
     @Test
     public void testCommitUnknownException() throws IOException {
       File parent = temp.newFolder(format.toString());
       File location = new File(parent, "commitunknown");
   
       HadoopTables tables = new HadoopTables(CONF);
       PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
       Table table = tables.create(SCHEMA, spec, location.toString());
   
       List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
       for (int i = 0; i < 2000; i++) {
         expected.add(new SimpleRecord(i, "a"));
         expected.add(new SimpleRecord(i, "b"));
         expected.add(new SimpleRecord(i, "c"));
         expected.add(new SimpleRecord(i, "d"));
       }
   
       Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
   
       SparkTable sparkTable = spy(new SparkTable(table, true));
   
       try {
         df.select("id", "data").sort("data").write()
           .format("iceberg")
           .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
           .mode(SaveMode.Append)
           .save(location.toString());
       } catch (Throwable t) {
         System.out.println("Error Expected");
       }
   
       long count = spark.read().format("iceberg").load(location.toString()).count();
       Assert.assertEquals(count, 8000);
     }
   ```
   
   This test will fail because of missing data files
   
   ```java
   [Executor task launch worker for task 0.0 in stage 4.0 (TID 5)] ERROR org.apache.iceberg.spark.source.BaseDataReader - Error reading file: /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
   org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for file: /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
   	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:161)
   	at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:195)
   	at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:54)
   	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218)
   	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74)
   	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
   	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
   	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
   	at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
   	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:77)
   	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:107)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
   	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: java.io.FileNotFoundException: File /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet does not exist
   	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
   	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
   	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
   	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
   	at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:159)
   	... 34 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on issue #4686: Spark: CommitFailedException in Spark Commit Operation Can Lead to Data Loss

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on issue #4686:
URL: https://github.com/apache/iceberg/issues/4686#issuecomment-1116721402

   Sorry incorrect exception there should have been CommitStateUnknown, the exception is not swallowed. The issue is that the operation.commit code in SnapshotProducer can throw CommitStateUnknown which will be surfaced up to Spark. This is explicitly rethrown see
   
   https://github.com/apache/iceberg/blob/674dcf1c2e3cb0444a183c55a24b521e297880de/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L321
   
   Once rethrown into the SparkWrite commitOperation code it will trigger the data source Abort method which will delete the data files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer closed issue #4686: Spark: CommitStateUnknown in Spark Commit Operation Can Lead to Data Loss

Posted by GitBox <gi...@apache.org>.
RussellSpitzer closed issue #4686: Spark: CommitStateUnknown in Spark Commit Operation Can Lead to Data Loss
URL: https://github.com/apache/iceberg/issues/4686


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #4686: Spark: CommitStateUnknown in Spark Commit Operation Can Lead to Data Loss

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #4686:
URL: https://github.com/apache/iceberg/issues/4686#issuecomment-1116733218

   Got it. The `SparkWrite` aborts and deletes data files due to the exception, but the table operation commit may succeed(in a unknown state).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on issue #4686: Spark: CommitFailedException in Spark Commit Operation Can Lead to Data Loss

Posted by GitBox <gi...@apache.org>.
flyrain commented on issue #4686:
URL: https://github.com/apache/iceberg/issues/4686#issuecomment-1116717358

   ```
   throw new CommitFailedException("Fun failure");
   ```
   Is this runtime exception swallowed somehow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org