You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Charlie Briggs (Jira)" <ji...@apache.org> on 2022/05/17 13:06:00 UTC

[jira] [Comment Edited] (HUDI-3242) Checkpoint 0 is ignored -Partial parquet file discovery after the first commit

    [ https://issues.apache.org/jira/browse/HUDI-3242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538175#comment-17538175 ] 

Charlie Briggs edited comment on HUDI-3242 at 5/17/22 1:05 PM:
---------------------------------------------------------------

We've also been confused by this behaviour (Hudi 0.10.1, but applies to 0.11.0). I previously unsuccessfully attempted to express this in this issue https://github.com/apache/hudi/issues/4146.
 * The checkpointing mechanism, with "deltastreamer.checkpoint.reset_key" is not well described in the documentation
 * We assume that to load a given input using Deltastreamer, we can run with `–checkpoint 0` (I think this is intuative, maybe it is only intuitive for me?)
 * When setting `–checkpoint 0` and `deltastreamer.checkpoint.reset_key = 0`, the `–checkpoint` flag is actually ignored, in Hudi 0.10.1 at least there are no logs of this behaviour being chosen

I would expect that regardless of past commits, or epoch times, if I explicitly specific a checkpoint, it should be used by Deltastreamer.

For reference our use case is this:
 * We have two Deltastreamer jobs, one which loads at an interval from S3, one which loads daily from a separate S3 bucket (both parquet inputs)
 * We use Deltastreamer for both jobs as schema validation fails when enabling it and using Deltastreamer/Spark writer concurrently (https://issues.apache.org/jira/browse/HUDI-2755) We do not currently wish to run without schema validation as we've had issues breaking tables in the past when disabling this.
 * We can't disable checkpointing on either of the two jobs or copy information from the previous commit (we previously used the support described here for this https://issues.apache.org/jira/browse/HUDI-2719)

As such, we instead try to write first to an ephemeral location, and then load from there using `–checkpoint 0` to force all data in the input to be loaded, regardless of write times.
_Fortunately this checkpointing behaviour described above is not currently necessary as we only allow one writer in parallel, so the ephemeral data is guaranteed to be newer than the previous checkpoint, but it is confusing._


was (Author: charlie.briggs):
We've also been confused by this behaviour (Hudi 0.10.1, but applies to 0.11.0).

* The checkpointing mechanism, with "deltastreamer.checkpoint.reset_key" is not well described in the documentation
* We assume that to load a given input using Deltastreamer, we can run with `–checkpoint 0` (I think this is intuative, maybe it is only intuitive for me?)
* When setting `–checkpoint 0` and `deltastreamer.checkpoint.reset_key = 0`, the `–checkpoint` flag is actually ignored, in Hudi 0.10.1 at least there are no logs of this behaviour being chosen

I would expect that regardless of past commits, or epoch times, if I explicitly specific a checkpoint, it should be used by Deltastreamer.

For reference our use case is this:

* We have two Deltastreamer jobs, one which loads at an interval from S3, one which loads daily from a separate S3 bucket (both parquet inputs)
* We use Deltastreamer for both jobs as schema validation fails when enabling it and using Deltastreamer/Spark writer concurrently (https://issues.apache.org/jira/browse/HUDI-2755) We do not currently wish to run without schema validation as we've had issues breaking tables in the past when disabling this.
* We can't disable checkpointing on either of the two jobs or copy information from the previous commit (we previously used the support described here for this https://issues.apache.org/jira/browse/HUDI-2719)

As such, we instead try to write first to an ephemeral location, and then load from there using `–checkpoint 0` to force all data in the input to be loaded, regardless of write times.
_Fortunately this checkpointing behaviour described above is not currently necessary as we only allow one writer in parallel, so the ephemeral data is guaranteed to be newer than the previous checkpoint, but it is confusing._

> Checkpoint 0 is ignored -Partial parquet file discovery after the first commit
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-3242
>                 URL: https://issues.apache.org/jira/browse/HUDI-3242
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark, writer-core
>    Affects Versions: 0.10.1
>         Environment: AWS
> EMR 6.4.0
> Spark 3.1.2
> Hudi - 0.10.1-rc
>            Reporter: Harsha Teja Kanna
>            Assignee: sivabalan narayanan
>            Priority: Minor
>              Labels: hudi-on-call, sev:critical, user-support-issues
>         Attachments: Screen Shot 2022-01-13 at 2.40.55 AM.png, Screen Shot 2022-01-13 at 2.55.35 AM.png, Screen Shot 2022-01-20 at 1.36.48 PM.png
>
>   Original Estimate: 3h
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Hi, I am testing release branch 0.10.1 as I needed few bug fixes from it.
> However, I see for a certain table. Only partial discovery of files happening after the initial commit of the table.
> But if the second partition is given as input for the first commit, all the files are getting discovered.
> First partition : 2021/01 has 744 files and all of them are discovered
> Second partition: 2021/02 has 762 files but only 72 are discovered.
> Checkpoint is set to 0. 
> No errors in the logs.
> {code:java}
> spark-submit \
> --master yarn \
> --deploy-mode cluster \
> --driver-cores 30 \
> --driver-memory 32g \
> --executor-cores 5 \
> --executor-memory 32g \
> --num-executors 120 \
> --jars s3://bucket/apps/datalake/jars/unused-1.0.0.jar,s3://bucket/apps/datalake/jars/spark-avro_2.12-3.1.2.jar \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer s3://bucket/apps/datalake/jars/hudi-0.10.0/hudi-utilities-bundle_2.12-0.10.0.jar \
> --table-type COPY_ON_WRITE \
> --source-ordering-field timestamp \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3a://datalake-hudi/datastream/v1/sessions_by_date \
> --target-table sessions_by_date \
> --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --op INSERT \
> --checkpoint 0 \
> --hoodie-conf hoodie.clean.automatic=true \
> --hoodie-conf hoodie.cleaner.commits.retained=1 \
> --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
> --hoodie-conf hoodie.clustering.inline=false \
> --hoodie-conf hoodie.clustering.inline.max.commits=1 \
> --hoodie-conf hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy \
> --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=250000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=sid,id \
> --hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=268435456 \
> --hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
> --hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
> --hoodie-conf hoodie.datasource.hive_sync.enable=false \
> --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
> --hoodie-conf hoodie.datasource.hive_sync.mode=hms \
> --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor \
> --hoodie-conf hoodie.datasource.hive_sync.table=sessions_by_date \
> --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
> --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator \
> --hoodie-conf hoodie.datasource.write.operation=insert \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
> --hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
> --hoodie-conf hoodie.datasource.write.recordkey.field=id,qid,aid \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
> --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/history/datastream/v1/sessions/2021/02 \
> --hoodie-conf hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector \
> --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT id, qid, aid, to_timestamp(timestamp) as timestamp, sid, date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
> --hoodie-conf hoodie.file.listing.parallelism=256 \
> --hoodie-conf hoodie.finalize.write.parallelism=256 \
> --hoodie-conf hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
> --hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
> --hoodie-conf hoodie.metadata.enable=true \
> --hoodie-conf hoodie.metadata.metrics.enable=true \
> --hoodie-conf hoodie.metrics.cloudwatch.metric.prefix=emr.datalake-service.prd.insert.sessions_by_date \
> --hoodie-conf hoodie.metrics.on=true \
> --hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
> --hoodie-conf hoodie.parquet.block.size=268435456 \
> --hoodie-conf hoodie.parquet.compression.codec=snappy \
> --hoodie-conf hoodie.parquet.max.file.size=268435456 \
> --hoodie-conf hoodie.parquet.small.file.limit=250000000 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)