You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/31 15:02:51 UTC

[GitHub] [hudi] WTa-hash commented on issue #2057: [SUPPORT] AWSDmsAvroPayload not processing Deletes correctly + IOException when reading log file

WTa-hash commented on issue #2057:
URL: https://github.com/apache/hudi/issues/2057#issuecomment-683835686


   After implementing the custom AWSDmsAvroPayload class referenced in https://issues.apache.org/jira/browse/HUDI-802. It seems processing insert + delete for a particular record within the same batch works correctly, but deletes do not work correctly in later batches and errors out.
   
   The error is:
   
   20/08/31 13:17:40 WARN TaskSetManager: Lost task 0.0 in stage 184.0 (TID 64740, ip-xxx-xx-x-xxx.ec2.internal, executor 3): org.apache.hudi.exception.HoodieIOException: IOException when reading log file
           at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
           at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
           at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
           at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
           at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
           at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
           at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
           at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
           at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
           at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
           at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
           at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
           at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
           at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:123)
           at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   20/08/31 13:17:40 ERROR TaskSetManager: Task 0 in stage 184.0 failed 4 times; aborting job
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 184.0 failed 4 times, most recent failure: Lost task 0.3 in stage 184.0 (TID 64743, ip-xxx-xx-x-xxx.ec2.internal, executor 4): org.apache.hudi.exception.HoodieIOException: IOException when reading log file
           at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
           at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
           at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
           at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
           at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
           at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
           at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
           at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
           at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
           at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
           at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
           at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
           at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
           at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:123)
           at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
     at scala.Option.foreach(Option.scala:257)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
     at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
     at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
     at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
     at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
     at org.apache.hudi.client.HoodieWriteClient.doCompactionCommit(HoodieWriteClient.java:1123)
     at org.apache.hudi.client.HoodieWriteClient.commitCompaction(HoodieWriteClient.java:1091)
     at org.apache.hudi.client.HoodieWriteClient.runCompaction(HoodieWriteClient.java:1074)
     at org.apache.hudi.client.HoodieWriteClient.compact(HoodieWriteClient.java:1045)
     at org.apache.hudi.client.HoodieWriteClient.lambda$forceCompact$12(HoodieWriteClient.java:1160)
     at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
     at org.apache.hudi.client.HoodieWriteClient.forceCompact(HoodieWriteClient.java:1157)
     at org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:502)
     at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157)
     at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101)
     at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:92)
     at org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:262)
     at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:184)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:108)
     at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
     at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
     at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112)
     at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
     at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
     at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
     at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
     at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
     at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
     at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
     ... 51 elided
   Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading log file
     at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
     at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
     at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
     at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
     at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
     at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
     at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
     at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
     at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
     at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
     at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
     at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
     at org.apache.spark.scheduler.Task.run(Task.scala:123)
     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   
   
   This can be reproduced by running the example script from the original post with DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY set to the custom AWSDmsAvroPayload class referenced in https://issues.apache.org/jira/browse/HUDI-802. I do not get the error mentioned above on Copy-On-Write (COW) tables. COW tables seem to process later batches correctly after implementing the custom AWSDmsAvroPayload class.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org