You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/03/27 07:12:00 UTC

[jira] [Created] (HUDI-3723) MOR MergeOnRead FitleringIterator stackoverflow error

sivabalan narayanan created HUDI-3723:
-----------------------------------------

             Summary: MOR MergeOnRead FitleringIterator stackoverflow error
                 Key: HUDI-3723
                 URL: https://issues.apache.org/jira/browse/HUDI-3723
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: sivabalan narayanan


We run integration tests against hudi at regular cadence and recently we are seeing stackoverflow error w/ MOR table for spark long running yaml. 

 
{code:java}
22/03/26 14:27:04 INFO ValidateDatasetNode: Validate data in target hudi path basaePath/*/*/*
22/03/26 14:28:51 ERROR Executor: Exception in task 2.0 in stage 975.0 (TID 17933)
java.lang.StackOverflowError
        at java.util.HashMap.removeNode(HashMap.java:821)
        at java.util.HashMap.remove(HashMap.java:800)
        at org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:238)
        at org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:55)
        at scala.collection.convert.Wrappers$JMapWrapperLike.remove(Wrappers.scala:296)
        at scala.collection.convert.Wrappers$JMapWrapperLike.remove$(Wrappers.scala:296)
        at scala.collection.convert.Wrappers$JMapWrapper.remove(Wrappers.scala:317)
        at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.removeLogRecord(HoodieMergeOnReadRDD.scala:187)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:262)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
        at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271) 
.
.
.
.{code}
this repeats for some time and the jobs fails eventually. 

 

Likely the root cause is, in our iterator, if we encounter a delete record, we call hasNext() so that we skip current one and go to next. But this creates a call function in stack and so if this repeats for 8k or more times and if stack size in lesser in the corresponding jvm, our test will fail. In reality, there could be million delete records too. so, we need to find a way to fix this. For now, we are experimenting around "-Xss100m" java option temporarily to increase the stack size in the jvm. 

 

Code snippet from HoodieMORRDD

especially the line 

```

if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping this.hasNext

``` 

in below snippet. 

 
{code:java}
override def hasNext: Boolean = {
  if (baseFileIterator.hasNext) {
    val curRowRecord = baseFileIterator.next()
    val curKey = curRowRecord.getString(recordKeyOrdinal)
    val updatedRecordOpt = removeLogRecord(curKey)
    if (updatedRecordOpt.isEmpty) {
      // No merge needed, load current row with required projected schema
      recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
      true
    } else {
      val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
      if (mergedAvroRecordOpt.isEmpty) {
        // Record has been deleted, skipping
        this.hasNext
      } else {
        // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
        //       record from the Delta Log will bear (full) Table schema, while record from the Base file
        //       might already be read in projected one (as an optimization).
        //       As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback
        //       to [[projectAvro]]
        val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder)
        recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
        true
      }
    }
  } else {
    super[LogFileIterator].hasNext
  } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)