You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/11/23 20:59:00 UTC

[jira] [Comment Edited] (HUDI-2841) Rollback of a failed delta commit w/ multi-writer fails

    [ https://issues.apache.org/jira/browse/HUDI-2841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448258#comment-17448258 ] 

sivabalan narayanan edited comment on HUDI-2841 at 11/23/21, 8:58 PM:
----------------------------------------------------------------------

There could be even a bigger problem here. Rollback actually rollsback other log files written by future delta commits. I retried the same scenario w/ non-partitioned dataset so that log files are part of commit that got failed. 

 

partition1/

  logfile1 // by DC1

  logfile2 // by DC2 failed mid way. 

 

restart delta streamer. 

DC3 creates logFile3. and completes. 

  and then rollbacks DC2. but list of files to be deleted has logFile1, logFile2 and logFile3. 

 

Excerpts from logs:

.hoodie just after DC2 failure. 
{code:java}
ls -ltra /tmp/hudi-deltastreamer-ny/ | grep -v crc
total 920
drwxrwxrwt  52 root  wheel    1664 Nov 23 14:46 ..
-rw-r--r--   1 nsb   wheel      96 Nov 23 14:46 .hoodie_partition_metadata
-rw-r--r--   1 nsb   wheel  439457 Nov 23 14:46 57993dad-822c-4b28-a9da-2ff6d61b10d0-0_0-23-80_20211123144608551.parquet
-rw-r--r--   1 nsb   wheel    2507 Nov 23 14:46 .57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.1_0-63-174
drwxr-xr-x  24 nsb   wheel     768 Nov 23 14:47 .hoodie
drwxr-xr-x  11 nsb   wheel     352 Nov 23 14:47 .
-rw-r--r--   1 nsb   wheel    2507 Nov 23 14:47 .57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.2_0-103-268 {code}
// log.2_ is written by failed commit. 

Rollback commit metadata
{code:java}
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:   BaseRollbackAction Executor. rolling back
 20211123144713094
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:   for partition 
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     success delete files 0
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     failed delete files 0
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     rollback log files 1
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.4_1-0-1 -> 105
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     written log files 3
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.3_0-26-90 -> 2507
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.1_0-63-174 -> 2507
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.2_0-103-268 -> 2507 {code}
// log.3_ is written by DC3. 

 

I am yet to look into how does rollback work w/ single writer mode. 

for eg: 

partition1/

  logfile1 // by DC1

  logfile2 // by DC2 failed mid way. 

 

restart delta streamer. 

we will first rollback any failed writes and then proceed (in single writer mode)

how do we detect that only logFile2 needs to be rolledback/deleted. bcoz, we rely on fileId and baseCommitTime as far as I understand. 

 

 


was (Author: shivnarayan):
There could be even a bigger problem here. Rollback actually rollsback other log files written by future delta commits. 

partition1/

  logfile1 // by DC1

  logfile2 // by DC2 failed mid way. 

 

restart delta streamer. 

DC3 creates logFile3. and completes. 

  and then rollbacks DC2. but list of files to be deleted has logFile1, logFile2 and logFile3. 

 

Excerpts from logs:

.hoodie just after DC2 failure. 
{code:java}
ls -ltra /tmp/hudi-deltastreamer-ny/ | grep -v crc
total 920
drwxrwxrwt  52 root  wheel    1664 Nov 23 14:46 ..
-rw-r--r--   1 nsb   wheel      96 Nov 23 14:46 .hoodie_partition_metadata
-rw-r--r--   1 nsb   wheel  439457 Nov 23 14:46 57993dad-822c-4b28-a9da-2ff6d61b10d0-0_0-23-80_20211123144608551.parquet
-rw-r--r--   1 nsb   wheel    2507 Nov 23 14:46 .57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.1_0-63-174
drwxr-xr-x  24 nsb   wheel     768 Nov 23 14:47 .hoodie
drwxr-xr-x  11 nsb   wheel     352 Nov 23 14:47 .
-rw-r--r--   1 nsb   wheel    2507 Nov 23 14:47 .57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.2_0-103-268 {code}
// log.2_ is written by failed commit. 

Rollback commit metadata
{code:java}
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:   BaseRollbackAction Executor. rolling back
 20211123144713094
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:   for partition 
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     success delete files 0
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     failed delete files 0
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     rollback log files 1
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.4_1-0-1 -> 105
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:     written log files 3
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.3_0-26-90 -> 2507
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.1_0-63-174 -> 2507
21/11/23 14:49:15 WARN BaseRollbackActionExecutor:       file:/tmp/hudi-deltastreamer-ny/.57993dad-822c-4b28-a9da-2ff6d61b10d0-0_20211123144608551.log.2_0-103-268 -> 2507 {code}
// log.3_ is written by DC3. 

 

I am yet to look into how does rollback work w/ single writer mode. 

for eg: 

partition1/

  logfile1 // by DC1

  logfile2 // by DC2 failed mid way. 

 

restart delta streamer. 

we will first rollback any failed writes and then proceed (in single writer mode)

how do we detect that only logFile2 needs to be rolledback/deleted. bcoz, we rely on fileId and baseCommitTime as far as I understand. 

 

 

> Rollback of a failed delta commit w/ multi-writer fails
> -------------------------------------------------------
>
>                 Key: HUDI-2841
>                 URL: https://issues.apache.org/jira/browse/HUDI-2841
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.10.0
>            Reporter: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 0.10.0
>
>
> In multi-writer set up, cleaner policy has to be set to lazy. And failed rollbacks are operated upon(actually being rolledback) only by cleaner and not eagerly.
> List based rollback for MOR fails with validation that files to be rolledback has large timestamp compared to commit being rolledback. 
>  
> Lets say, timeline is as follows:
> DC1, DC2... and DC3 failed midway.
> Partition1/
>    baseFile1 (DC1)
>    baseFile2 (DC2) // due to small file handling
>    baseFile3 (DC3)
>  
> Restarted deltastremaer. 
> which does DC4
>  
> Partition1/
>    baseFile1 (DC1)
>    baseFile2 (DC2) // due to small file handling
>    baseFile3 (DC3)
>   baseFile4 (DC4) 
>  
> At the end of the commit, cleaner kicks in and tries to rollbacking any failed commits. In RollbackUtils, where we fetch the latestFileSlice and find all files to be deleted, we have a validation which checks that base commit time for every file (in latest file slice) should have commit time < commit being rolledback. In this case, baseFile4 has higher timestamp compared to DC3 and hence validation fails. 
>  
> {code:java}
> 21/11/23 09:53:49 DEBUG wire: http-outgoing-11 >> "[\r][\n]"
> 21/11/23 09:53:49 ERROR Executor: Exception in task 32.0 in stage 45.0 (TID 3191)
> java.lang.IllegalArgumentException
>         at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
>         at org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateAppendRollbackBlocksAction$0(RollbackUtils.java:254)
>         at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
>         at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at org.apache.hudi.table.action.rollback.RollbackUtils.generateAppendRollbackBlocksAction(RollbackUtils.java:266)
>         at org.apache.hudi.table.action.rollback.RollbackUtils.lambda$generateRollbackRequestsUsingFileListingMOR$e97f040e$1(RollbackUtils.java:210)
>         at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:134)
>         at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>         at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>         at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>         at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
>         at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1429)
>         at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
>         at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
>         at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
>         at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
>         at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
>         at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:127)
>         at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> 21/11/23 09:53:49 WARN RollbackUtils: Rollback Instant time : 20211123095048787 {code}
>  
> Code of interest
> {code:java}
> private static List<ListingBasedRollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
>     HoodieCommitMetadata commitMetadata, HoodieTable table) {
>   ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
>   // wStat.getPrevCommit() might not give the right commit time in the following
>   // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
>   // used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
>   // But the index (global) might store the baseCommit of the base and not the requested, hence get the
>   // baseCommit always by listing the file slice
>   Map<String, String> fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath)
>       .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
>   return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
>     // Filter out stats without prevCommit since they are all inserts
>     boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
>         && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
>     if (validForRollback) {
>       // For sanity, log instant time can never be less than base-commit on which we are rolling back
>       ValidationUtils
>           .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
>               HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()));
>     }
>     return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
>         // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
>         // to delete and we should not step on it
>         wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
>   }).map(wStat -> {
>     String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
>     return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
>         baseCommitTime);
>   }).collect(Collectors.toList());
> } {code}
> Verified with log statements that DC4 base file is what runs into the validation error. 
>  
> But the issue could also happen if incase a compaction had kicked in mid way and cleaner policy was very relaxed and so rollback got triggered after 10 commits or 1 to 2 compactions. 
>  
> In the code block of interest, we call 
> ` table.getSliceView().getLatestFileSlices(partitionPath)`. Instead, I tried to use `
> table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), false)`
> But this started including files as part of the next delta commit as well to the list of files to be deleted. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)