You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/28 12:06:20 UTC

[GitHub] [iceberg] davseitsev opened a new issue #1398: No retries on snapshot commit on eventual consistent file system

davseitsev opened a new issue #1398:
URL: https://github.com/apache/iceberg/issues/1398


   I'm building Spark structured streaming application which writes data to Amazon S3 in Iceberg format. Sometimes the query fails without reties dues to S3 eventual consistency.
   ```
   20/08/28 10:15:08 ERROR MicroBatchExecution: Query event-streaming-query-76-v0 [id = bea88375-dd55-4eb1-bc4a-44ed9f5fdbf9, runId = 7536644f-1ff8-42fd-b9b7-812fcdab1e21] terminated with error
   org.apache.spark.SparkException: Writing job aborted.
   	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
   	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
   	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:283)
   	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:332)
   	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
   	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
   	at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
   	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
   	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
   	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
   	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
   	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
   Caused by: org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
   	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:159)
   	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:95)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
   	at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
   	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
   	at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
   	at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
   	at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
   	at org.apache.iceberg.FastAppend.apply(FastAppend.java:142)
   	at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:149)
   	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:262)
   	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:403)
   	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
   	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:188)
   	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:261)
   	at org.apache.iceberg.spark.source.Writer.commitOperation(Writer.java:149)
   	at org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:93)
   	at org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:86)
   	at org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter.commit(MicroBatchWriter.scala:31)
   	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)
   	... 35 more
   Caused by: java.io.FileNotFoundException: No such file or directory: s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
   	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
   	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
   	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
   	at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:157)
   	... 56 more
   ```
   As far as I understand it happens in `SnapshotProducer` during commit operation. `FastAppend` tries to read manifest list in `apply()` method and get `java.io.FileNotFoundException` in `S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)`. But actually file exists.
   
   It's a known issue with S3AFileSystem, it checks whether file exists before creating a file and it breaks read-after-write strong consistency. And when another client want to read newly created file it can get FNF exception.
   The problem is that `SnapshotProducer` reties only `CommitFailedException` and doesn't retry any `IOException`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] johnclara commented on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
johnclara commented on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-725785845


   Hi @davseitsev, we're also using s3a (version 2.8.4) and I'm wondering if you see this frequently or within a single operation? We're starting to have an issue which we can't identify within a single create() but it sounds like this happens from a create() and then open()?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-682815249


   Here is the snippet from `create` in `S3AfileSystem`:
   
   ```
       try {
         // get the status or throw an FNFE
         status = getFileStatus(path);
   
         // if the thread reaches here, there is something at the path
         if (status.isDirectory()) {
           // path references a directory: automatic error
           throw new FileAlreadyExistsException(path + " is a directory");
         }
         if (!overwrite) {
           // path references a file and overwrite is disabled
           throw new FileAlreadyExistsException(path + " already exists");
         }
         LOG.debug("Overwriting file {}", path);
       } catch (FileNotFoundException e) {
         // this means the file is not found
   
       }
   ```
   
   Looks like `getFileStatus` breaks this logic and even setting the overwrite flag does not help. At least, in `S3AFileSystem`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-682837392


   Our S3 file system doesn't do the `getFileStatus` check when overwriting, which is how we avoid this. I think the best way to handle this is to add a certain amount of retries at the `FileIO` layer.
   
   We already have retries for `table.refresh()` (in `BaseMetastoreTableOperations`) that take care of most of these issues for the root metadata. The reason why we added the retries there was to avoid adding extra latency when there actually is a problem -- when a data file is missing, for example. But, it looks like it would be better to have retries for at least metadata files when using S3.
   
   I think it makes sense to add retries to `HadoopInputFile` and to make the number of retries and total retry timeout configurable either in table properties or Hadoop Configuration. The downside to adding retries there is that we'd have retries for data files as well, but that seems like a reasonable trade-off to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-682803226


   I remember we had this issue earlier. [Here](https://github.com/apache/iceberg/pull/779) is the PR with some discussion. The idea suggested there was either contribute a custom `FileIO` implementation in Iceberg or use Hadoop FS that handles it already.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-682807416


   I feel like we need to agree on the best solution and document it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-682803791


   cc @rdblue 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #1398: No retries on snapshot commit on eventual consistent file system

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on issue #1398:
URL: https://github.com/apache/iceberg/issues/1398#issuecomment-682815249


   Here is the snippet from `create` in `S3AfileSystem`:
   
   ```
       try {
         // get the status or throw an FNFE
         status = getFileStatus(path);
   
         // if the thread reaches here, there is something at the path
         if (status.isDirectory()) {
           // path references a directory: automatic error
           throw new FileAlreadyExistsException(path + " is a directory");
         }
         if (!overwrite) {
           // path references a file and overwrite is disabled
           throw new FileAlreadyExistsException(path + " already exists");
         }
         LOG.debug("Overwriting file {}", path);
       } catch (FileNotFoundException e) {
         // this means the file is not found
   
       }
   ```
   
   Looks like `getFileStatus` is what breaks this logic and even setting the overwrite flag does not help. At least, in the case of `S3AFileSystem`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org