You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/03/28 17:36:00 UTC

[jira] [Assigned] (HUDI-3723) MOR MergeOnRead FitleringIterator stackoverflow error

     [ https://issues.apache.org/jira/browse/HUDI-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin reassigned HUDI-3723:
-------------------------------------

    Assignee: Alexey Kudinkin

> MOR MergeOnRead FitleringIterator stackoverflow error
> -----------------------------------------------------
>
>                 Key: HUDI-3723
>                 URL: https://issues.apache.org/jira/browse/HUDI-3723
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: sivabalan narayanan
>            Assignee: Alexey Kudinkin
>            Priority: Major
>             Fix For: 0.11.0
>
>
> We run integration tests against hudi at regular cadence and recently we are seeing stackoverflow error w/ MOR table for spark long running yaml. 
>  
> {code:java}
> 22/03/26 14:27:04 INFO ValidateDatasetNode: Validate data in target hudi path basaePath/*/*/*
> 22/03/26 14:28:51 ERROR Executor: Exception in task 2.0 in stage 975.0 (TID 17933)
> java.lang.StackOverflowError
>         at java.util.HashMap.removeNode(HashMap.java:821)
>         at java.util.HashMap.remove(HashMap.java:800)
>         at org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:238)
>         at org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:55)
>         at scala.collection.convert.Wrappers$JMapWrapperLike.remove(Wrappers.scala:296)
>         at scala.collection.convert.Wrappers$JMapWrapperLike.remove$(Wrappers.scala:296)
>         at scala.collection.convert.Wrappers$JMapWrapper.remove(Wrappers.scala:317)
>         at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.removeLogRecord(HoodieMergeOnReadRDD.scala:187)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:262)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271) 
> .
> .
> .
> .{code}
> this repeats for some time and the jobs fails eventually. 
>  
> Likely the root cause is, in our iterator, if we encounter a delete record, we call hasNext() so that we skip current one and go to next. But this creates a call function in stack and so if this repeats for 8k or more times and if stack size in lesser in the corresponding jvm, our test will fail. In reality, there could be million delete records too. so, we need to find a way to fix this. For now, we are experimenting around "-Xss100m" java option temporarily to increase the stack size in the jvm. 
>  
> Code snippet from HoodieMORRDD
> especially the line 
> ```
> if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping this.hasNext
> ``` 
> in below snippet. 
>  
> {code:java}
> override def hasNext: Boolean = {
>   if (baseFileIterator.hasNext) {
>     val curRowRecord = baseFileIterator.next()
>     val curKey = curRowRecord.getString(recordKeyOrdinal)
>     val updatedRecordOpt = removeLogRecord(curKey)
>     if (updatedRecordOpt.isEmpty) {
>       // No merge needed, load current row with required projected schema
>       recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
>       true
>     } else {
>       val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
>       if (mergedAvroRecordOpt.isEmpty) {
>         // Record has been deleted, skipping
>         this.hasNext
>       } else {
>         // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
>         //       record from the Delta Log will bear (full) Table schema, while record from the Base file
>         //       might already be read in projected one (as an optimization).
>         //       As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback
>         //       to [[projectAvro]]
>         val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder)
>         recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
>         true
>       }
>     }
>   } else {
>     super[LogFileIterator].hasNext
>   } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)