You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:58:46 UTC

[GitHub] [hudi] a-uddhav opened a new issue #1669: [SUPPORT] IOException when PAYLOAD_CLASS_OPT_KEY is used to write to MOR tbls

a-uddhav opened a new issue #1669:
URL: https://github.com/apache/hudi/issues/1669


   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   Yes
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   I am writing records to a MOR type table using Spark Datasource while overriding the logic for `preCombine()` and `combineAndGetUpdateValue` instead of using the default behavior. However, the spark application run into an unexpected error during compaction
   
   ```
   org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
   	at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
   	at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
   
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   I am running the spark application on Intellij.
   1. Write the first set of records to the MOR tbl in the OVERWRITE mode. 
   I used the following 
   ```
   // record class
   case class RecordClass (id:Int,
                           name:String,
                           age:Int,
                           event_type:Int,
                           created_at:Timestamp)
   
      RecordClass(1, "Uddhav-1", 30, 0,Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav0", 40, 1,  Timestamp.valueOf("2020-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 31, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(3, "Uddhav2", 32, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
       RecordClass(4, "Uddhav3", 33, 0, Timestamp.valueOf("2023-09-07 00:00:00"))
   ```
   with 
   ```
   //hudi options 
   HoodieWriteConfig.TABLE_NAME -> "<tblName>",
       DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
       DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
       DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "<date part of the created_at_column>",
       DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "created_at",
       DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> "<payload class>"
   ```
   and 
   ```
   //payload class
   class RecordPayloadClass (val genericRecord: GenericRecord, val comparable: Comparable[Int])
     extends BaseAvroPayload(genericRecord, comparable)
       with HoodieRecordPayload[RecordPayloadClass]{
   
     override def preCombine(another: RecordPayloadClass): RecordPayloadClass  = {
   
       val latestRecord: GenericRecord = pickLatestRecord(this.genericRecord, another.genericRecord)
   
       if (latestRecord.equals(this.genericRecord)) this else another
     }
   
     // get the current record
     override def combineAndGetUpdateValue(currentValue: IndexedRecord, schema: Schema): util.Option[IndexedRecord] = {
       val diskRecord: GenericRecord = currentValue.asInstanceOf[GenericRecord]
       val inFlighRecord: GenericRecord = this.genericRecord
   
       Option.of(pickLatestRecord(diskRecord, inFlighRecord))
     }
   
     override def getInsertValue(schema: Schema): util.Option[IndexedRecord] =  Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema))
   
     private def pickLatestRecord(diskRecord: GenericRecord, inFlighRecord: GenericRecord): GenericRecord  = {
       val diskRecordTs:Int = diskRecord.get("event_type").asInstanceOf[Int]
       val inFlighRecordTs:Int = inFlighRecord.get("event_type").asInstanceOf[Int]
   
       if(inFlighRecordTs > diskRecordTs) inFlighRecord
   
       else if (diskRecordTs == inFlighRecordTs) {
         val diskRecordEventType:Int = diskRecord.get("age").asInstanceOf[Int]
         val inFlighRecordEventType:Int = inFlighRecord.get("age").asInstanceOf[Int]
         // always pick the latest record
         if(inFlighRecordEventType >= diskRecordEventType) inFlighRecord else diskRecord
       }
   
       else diskRecord
     }
   }
   ```
   This write should be successful
   
   2. Then write another set of records in the APPEND mode. I used the following
   ```
   RecordClass(1, "Uddhav4", 41, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav5", 42, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
   
       RecordClass(3, "Uddhav2", 32, 0, Timestamp.valueOf("2022-09-07 00:00:00")), 
   
       RecordClass(4, "Uddhav3", 33, 2, Timestamp.valueOf("2023-09-07 00:00:00")), 
   
       RecordClass(1, "Uddhav5", 42, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav5", 43, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav5", 44, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav5", 45, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav5", 46, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
   
       RecordClass(3, "Uddhav2", 32, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
       RecordClass(3, "Uddhav21", 33, 1, Timestamp.valueOf("2022-09-07 00:00:00")),
       RecordClass(3, "Uddhav22", 34, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
       RecordClass(3, "Uddhav23", 35, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
       RecordClass(3, "Uddhav24", 36, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
       RecordClass(3, "Uddhav25", 38, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
       RecordClass(3, "Uddhav26", 39, 1, Timestamp.valueOf("2022-09-07 00:00:00")), 
   
       RecordClass(2, "Uddhav1", 31, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 32, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 33, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 34, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 35, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 36, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 37, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 38, 0, Timestamp.valueOf("2021-09-07 00:00:00")),
       RecordClass(2, "Uddhav1", 39, 0, Timestamp.valueOf("2021-09-07 00:00:00"))
   ```
   This  write should fail with the aforementioned `HoodieIOException`
   
   **Expected behavior**
   
   The expectation was that records with higher `event type` and/or `age` should be written out to the hudi table. However, instead, I ran into the exception. The above logic works well when the tbl type is COW.
   
   **Environment Description**
   
   * Hudi version : 0.5.2
   
   * Spark version : 2.4.4
   
   * Hive version : No
   
   * Hadoop version : No
   
   * Storage (HDFS/S3/GCS..) : Local FS
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 28.0 failed 1 times, most recent failure: Lost task 1.0 in stage 28.0 (TID 13534, localhost, executor driver): org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
   	at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
   	at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
   	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
   	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
   	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
   	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
   	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1111)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
   	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   	at org.apache.spark.scheduler.Task.run(Task.scala:109)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
   	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
   	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
   	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
   	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
   	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
   	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
   	at scala.Option.foreach(Option.scala:257)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
   	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
   	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
   	at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
   	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
   	at org.apache.hudi.client.HoodieWriteClient.doCompactionCommit(HoodieWriteClient.java:1123)
   	at org.apache.hudi.client.HoodieWriteClient.commitCompaction(HoodieWriteClient.java:1091)
   	at org.apache.hudi.client.HoodieWriteClient.runCompaction(HoodieWriteClient.java:1074)
   	at org.apache.hudi.client.HoodieWriteClient.compact(HoodieWriteClient.java:1045)
   	at org.apache.hudi.client.HoodieWriteClient.lambda$forceCompact$12(HoodieWriteClient.java:1160)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
   	at org.apache.hudi.client.HoodieWriteClient.forceCompact(HoodieWriteClient.java:1157)
   	at org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:502)
   	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157)
   	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101)
   	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:92)
   	at org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:262)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:184)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
   	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
   	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
   	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
   	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
   	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
   	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
   	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
   	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
   	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
   	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
   	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
   	at com.paypay.daas.uddhav.CustomPreCombine2$.delayedEndpoint$com$paypay$daas$uddhav$CustomPreCombine2$1(CustomPreCombine2.scala:77)
   	at com.paypay.daas.uddhav.CustomPreCombine2$delayedInit$body.apply(CustomPreCombine2.scala:10)
   	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
   	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
   	at scala.App$$anonfun$main$1.apply(App.scala:76)
   	at scala.App$$anonfun$main$1.apply(App.scala:76)
   	at scala.collection.immutable.List.foreach(List.scala:392)
   	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
   	at scala.App$class.main(App.scala:76)
   	at com.paypay.daas.uddhav.CustomPreCombine2$.main(CustomPreCombine2.scala:10)
   	at com.paypay.daas.uddhav.CustomPreCombine2.main(CustomPreCombine2.scala)
   Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
   	at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
   	at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
   	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
   	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
   	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
   	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
   	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1111)
   	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
   	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   	at org.apache.spark.scheduler.Task.run(Task.scala:109)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] a-uddhav commented on issue #1669: [SUPPORT] IOException when PAYLOAD_CLASS_OPT_KEY is used to write to MOR tbls

Posted by GitBox <gi...@apache.org>.
a-uddhav commented on issue #1669:
URL: https://github.com/apache/hudi/issues/1669#issuecomment-634449462


   I figured what was I doing wrong. I had missed a constructor to the payload class.
   ```
   class RecordPayloadClass (val genericRecord: GenericRecord, val comparable: Comparable[Int])
     extends BaseAvroPayload(genericRecord, comparable)
       with HoodieRecordPayload[RecordPayloadClass]{
   
   =====  added constructor ======
     // required by SpillableMapUtils.convertToHoodieRecordPayload
     def this(optionalRecord: Option[GenericRecord]) {
       this(optionalRecord.get(), 1)
     }
   
     override def preCombine(another: RecordPayloadClass): RecordPayloadClass  = {
   ....
   ```
   I will close this issue.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] a-uddhav closed issue #1669: [SUPPORT] IOException when PAYLOAD_CLASS_OPT_KEY is used to write to MOR tbls

Posted by GitBox <gi...@apache.org>.
a-uddhav closed issue #1669:
URL: https://github.com/apache/hudi/issues/1669


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org