You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/04/10 22:28:00 UTC

[jira] [Updated] (HUDI-3846) Clustering with allow update strategy could fail w/ async clustering

     [ https://issues.apache.org/jira/browse/HUDI-3846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-3846:
--------------------------------------
    Fix Version/s: 0.12.0

> Clustering with allow update strategy could fail w/ async clustering
> --------------------------------------------------------------------
>
>                 Key: HUDI-3846
>                 URL: https://issues.apache.org/jira/browse/HUDI-3846
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: clustering
>            Reporter: sivabalan narayanan
>            Priority: Major
>             Fix For: 0.12.0
>
>
> with async clustering and allow update strategy, there could be failures in the pipeline. You can check out the below stacktrace. 
>  
> here is the condition when this is possible. When we are looking to support updates when clustering is in progress, we might have to consider this scenario. 
>  
> A file group FG1 was chosen to be replaced by clustering plan. And clustering is in progress. At this juncture, if we get a new batch of upserts and after index look up, if records are tagged w/ FG1, and before merge handle for upsert is called, lets say clustering is completed. So, when getting latest base file for FG1, FileSystemView might return none since the FG has been replaced (since clustering is completed). So, when are looking to add update support w/ async clustering, we might have to support this use-case as well. But its a low priority ticket for now, since with allow update strategy we do claim that there could be data loss and on one is supposed to try out in production setting for now. 
>  
> {code:java}
> 22/04/09 19:26:16 ERROR HoodieStreamingSink: Micro batch id=268 threw following exception: 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 17111.0 failed 1 times, most recent failure: Lost task 6.0 in stage 17111.0 (TID 727932, localhost, executor driver): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :6
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.NoSuchElementException: No value present in Option
> 	at org.apache.hudi.common.util.Option.get(Option.java:89)
> 	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:377)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:348)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
> 	... 30 more
> Driver stacktrace:
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> 	at scala.Option.foreach(Option.scala:257)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
> 	at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
> 	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:642)
> 	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:309)
> 	at org.apache.hudi.HoodieStreamingSink$$anonfun$1$$anonfun$2.apply(HoodieStreamingSink.scala:88)
> 	at org.apache.hudi.HoodieStreamingSink$$anonfun$1$$anonfun$2.apply(HoodieStreamingSink.scala:88)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.hudi.HoodieStreamingSink$$anonfun$1.apply(HoodieStreamingSink.scala:87)
> 	at org.apache.hudi.HoodieStreamingSink$$anonfun$1.apply(HoodieStreamingSink.scala:87)
> 	at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:164)
> 	at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:86)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> 	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> 	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> 	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :6
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> 	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.NoSuchElementException: No value present in Option
> 	at org.apache.hudi.common.util.Option.get(Option.java:89)
> 	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:116)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:377)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:348)
> 	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)