You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/01/03 20:10:00 UTC

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

     [ https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-2947:
--------------------------------------
    Labels: pull-request-available sev:critical  (was: pull-request-available sev:high)

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode
> ------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2947
>                 URL: https://issues.apache.org/jira/browse/HUDI-2947
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ethan Guo
>            Assignee: sivabalan narayanan
>            Priority: Critical
>              Labels: pull-request-available, sev:critical
>             Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 0`, in the continuous mode, the deltastreamer job may pick up the wrong checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) happens after the replacecommit and clean, which is reset to "0", instead of "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))                || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
>     resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" and "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"  are both true as "--checkpoint 0" is configured and last commit is replacecommit without checkpoint keys.  This leads to the resume checkpoint string being reset to the configured checkpoint, skipping the timeline walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both "--checkpoint 0" and "--continuous", with inline clustering and sync clean enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type COPY_ON_WRITE \
>   --op BULK_INSERT \
>   --checkpoint 0 \
>   --continuous {code}
> test.properties:
>  
>  
> {code:java}
> hoodie.cleaner.commits.retained=4
> hoodie.keep.min.commits=5
> hoodie.keep.max.commits=7
> hoodie.clean.async=true
> hoodie.clustering.inline=true
> hoodie.clustering.async.max.commits=3
> hoodie.compact.inline.max.delta.commits=3
> hoodie.insert.shuffle.parallelism=10
> hoodie.upsert.shuffle.parallelism=10
> hoodie.bulk_insert.shuffle.parallelism=10
> hoodie.delete.shuffle.parallelism=10
> hoodie.bulkinsert.shuffle.parallelism=10
> hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=partition
> # turn off any small file handling, for ease of testing
> hoodie.parquet.small.file.limit=1
> benchmark.input.source.path=...{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)