You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/11/10 02:11:38 UTC

[GitHub] [hudi] zuyanton opened a new issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

zuyanton opened a new issue #2237:
URL: https://github.com/apache/hudi/issues/2237


   
   **Describe the problem you faced**
   We are running MoR table with compaction set to be run inline, we have noticed that whenever log group grows to a large size (by either upsetting large amount of data or postponing compaction for a while ), performing compaction on the table fails with error ``` ERROR AbstractHoodieLogRecordScanner: Got exception when reading log file
   com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008``` after Inspecting stack trace and running few tests we think that the problem is with ```ExternalSpillableMap``` more specifically with ```DiskBasedMap``` . whenever ```ExternalSpillableMap``` grows too large, it  starts spilling data to disk, involving ```DiskBasedMap.put```. whenever data gets read via ```DiskBasedMap.get```, that's when we observe the error.
   
   **To Reproduce**
   1. set hudi property ```hoodie.memory.compaction.max.size->1``` to ensure that ```ExternalSpillableMap``` would spill everything to disk.   
   2. set ```"hoodie.compact.inline.max.delta.commits"->"1"``` to speed up compaction triggering   
   3. create a row ```var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")```
   4. create a hudi table out of this row by running ```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```  
   5. upsert this row once again to have hudi create a log group by running ```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```  once again   
   6. now upsert this row once again to trigger compaction by running ```df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/testTable_zuyanton_1")```   once again  
   7. observe the error  
   
   stack trace and full code snippets are bellow  
   
   **Additional info**  
   possibly related to :   
   https://issues.apache.org/jira/browse/HUDI-1205  
   https://github.com/apache/hudi/issues/1890  
   https://github.com/apache/hudi/issues/1823
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   
   **Stacktrace**
   
   ```20/11/04 03:50:06 ERROR AbstractHoodieLogRecordScanner: Got exception when reading log file
   com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008
   Serialization trace:
   orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
   data (org.apache.hudi.common.model.HoodieRecord)
   	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
   	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
   	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
   	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
   	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
   	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
   	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
   	at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:107)
   	at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:81)
   	at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:222)
   	at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:215)
   	at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
   	at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:169)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.processNextRecord(HoodieMergedLogRecordScanner.java:115)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:278)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:306)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:153)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
   	at org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:128)
   	at org.apache.hudi.table.action.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:99)
   	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
   	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
   	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
   	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
   	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
   	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
   	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:123)
   	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.ClassNotFoundException: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$119/1991639008
   	at java.lang.Class.forName0(Native Method)
   	at java.lang.Class.forName(Class.java:348)
   	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154) 
   ``` 
   
   **Full code snippet**  
   
   ```  import org.apache.spark.sql.functions._
     import org.apache.hudi.hive.MultiPartKeysValueExtractor
     import org.apache.hudi.QuickstartUtils._
     import scala.collection.JavaConversions._
     import org.apache.spark.sql.SaveMode
     import org.apache.hudi.DataSourceReadOptions._
     import org.apache.hudi.DataSourceWriteOptions._
     import org.apache.hudi.DataSourceWriteOptions
     import org.apache.hudi.config.HoodieWriteConfig._
     import org.apache.hudi.config.HoodieWriteConfig
     import org.apache.hudi.keygen.ComplexKeyGenerator
     import org.apache.hadoop.hive.conf.HiveConf
     val hiveConf = new HiveConf()
     val hiveMetastoreURI = hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
     val hiveServer2URI = hiveMetastoreURI.substring(0, hiveMetastoreURI.lastIndexOf(":"))
     var hudiOptions = Map[String,String](
       HoodieWriteConfig.TABLE_NAME → "testTable_zuyanton",
       "hoodie.consistency.check.enabled"->"true",
       "hoodie.compact.inline.max.delta.commits"->"1",
       "hoodie.compact.inline"->"true",
       DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
       DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
       DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,
       DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
       DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
       DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
       DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable_zuyanton",
       DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
       DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → classOf[MultiPartKeysValueExtractor].getName,
       DataSourceWriteOptions.HIVE_URL_OPT_KEY ->s"jdbc:hive2://$hiveServer2URI:10000",
       "hoodie.memory.compaction.max.size" -> "1"
     )
   
     //spark.sql("drop table if exists testTable_zuyanton_ro")
     //spark.sql("drop table if exists testTable_zuyanton_rt")
     var df = Seq((1, 2, 3)).toDF("pk", "partition", "sort_key")
     df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
     df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
     df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testbucket/testTable_zuyanton_1")
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2237:
URL: https://github.com/apache/hudi/issues/2237#issuecomment-726711950


   @zuyanton :  The exception seems to be that OverwriteWithLatestPayload has a reference to a lambda function but the one pointed in HoodieSparkSQLWriter (in your link) would have resulted in an actual field. right ? 
   
   The only place I saw a lambda function was being used was used to be in https://github.com/apache/hudi/blob/release-0.6.0/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java#L47 (0.6.0) which got changed in master. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zuyanton commented on issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
zuyanton commented on issue #2237:
URL: https://github.com/apache/hudi/issues/2237#issuecomment-727004471


   @bvaradar that makes sense . let me run my tests once again 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2237:
URL: https://github.com/apache/hudi/issues/2237#issuecomment-752990151


   Closing due to inactivity.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar closed issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
bvaradar closed issue #2237:
URL: https://github.com/apache/hudi/issues/2237


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2237:
URL: https://github.com/apache/hudi/issues/2237#issuecomment-725874107


   @zuyanton : I think this has been fixed in master as part of https://github.com/apache/hudi/commit/b335459c805748815ccc858ff1a9ef4cd830da8c
   
   Can you please try and let me know and I can close HUDI-1205 if it works. 
   
   Thanks,
   Balaji.V


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zuyanton commented on issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
zuyanton commented on issue #2237:
URL: https://github.com/apache/hudi/issues/2237#issuecomment-726436517


   @bvaradar , just pulled and build the masters , error is still there. I looked at the pull request you linked and it looks like it doesn't solves the issue, it only makes pre combining logs from the same log group optional, which I dont think we can make it optional since according to  [this line](https://github.com/apache/hudi/pull/2088/files#diff-89c27ffa1f8da07a7003d4476cfad7b95c8bd89c904ae25e03749ec1e69366f4R137)  whenever operation type is upsert , then dedupe will take place 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2237: [SUPPORT] DiskBasedMap doesn't serialize/deserialize correctly

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2237:
URL: https://github.com/apache/hudi/issues/2237#issuecomment-744702620


   @zuyanton : Any update on this ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org