You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Dave Hagman (Jira)" <ji...@apache.org> on 2021/10/12 17:15:00 UTC

[jira] [Commented] (HUDI-2549) Exceptions when using second writer into Hudi table managed by DeltaStreamer

    [ https://issues.apache.org/jira/browse/HUDI-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427807#comment-17427807 ] 

Dave Hagman commented on HUDI-2549:
-----------------------------------

In order to try and validate my hypothesis about race conditions I added delay to the deltastreamer by setting:
--min-sync-interval-seconds 120
This forces the deltastreamer to sync only once every 2 minutes. This allows my datasource writer to make much more progress but the *spark datasource* still eventually failed with the following error:
{code:java}
21/10/08 15:02:49 INFO Javalin: Javalin started in 6ms \o/                                                                                                                                                                                                          [635/1841]
21/10/08 15:02:49 INFO S3NativeFileSystem: Opening 's3://events-realtime-sot-test/zone=mwc/v2/.hoodie/hoodie.properties' for reading
21/10/08 15:02:50 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[==>20211008150249__commit__REQUESTED]}
21/10/08 15:02:50 INFO MultipartUploadOutputStream: close closed:false s3://events-realtime-sot-test/zone=mwc/v2/.hoodie/.heartbeat/20211008150249
21/10/08 15:02:50 INFO HoodieActiveTimeline: Creating a new instant [==>20211008150249__commit__REQUESTED]
Exception in thread "main" org.apache.hudi.exception.HoodieIOException: Failed to create file s3://events-realtime-sot-test/zone=mwc/v2/.hoodie/20211008150249.commit.requested
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:587)
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:548)
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createNewInstant(HoodieActiveTimeline.java:146)
        at org.apache.hudi.client.AbstractHoodieWriteClient.startCommit(AbstractHoodieWriteClient.java:739)
        at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:725)
        at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:716)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:264)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
        at com.klaviyo.planetevents.DatalakeExcavator.hudiBackedWrite(DatalakeExcavator.java:259)
        at com.klaviyo.planetevents.DatalakeExcavator.processNextPath(DatalakeExcavator.java:162)
        at com.klaviyo.planetevents.DatalakeExcavator.lambda$null$1(DatalakeExcavator.java:116)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at com.klaviyo.planetevents.DatalakeExcavator.lambda$processBatches$2(DatalakeExcavator.java:114)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at com.klaviyo.planetevents.DatalakeExcavator.processBatches(DatalakeExcavator.java:114)
        at com.klaviyo.planetevents.DatalakeExcavator.run(DatalakeExcavator.java:79)
        at com.klaviyo.apiarycommon.spark.ApiarySparkJob.execute(ApiarySparkJob.java:61)
        at com.klaviyo.planetevents.DatalakeExcavator.main(DatalakeExcavator.java:349)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:959)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1038)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1047)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://events-realtime-sot-test/zone=mwc/v2/.hoodie/20211008150249.commit.requested
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:339)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1125)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1105)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:994)
        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:213)
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$create$2(HoodieWrapperFileSystem.java:221)
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:100)
        at org.apache.hudi.common.fs.HoodieWrapperFileSystem.create(HoodieWrapperFileSystem.java:220)
        at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:582)
        ... 56 mor
{code}
This error occurred due to a somewhat surprising issue: A commit file for the instant _*20211008150249*_ already existed. I believe this means that the deltastreamer created a commit instant between the time that the spark datasource generated the expected instant and its actual creation. 

> Exceptions when using second writer into Hudi table managed by DeltaStreamer
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-2549
>                 URL: https://issues.apache.org/jira/browse/HUDI-2549
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>            Reporter: Dave Hagman
>            Assignee: Dave Hagman
>            Priority: Critical
>              Labels: multi-writer
>             Fix For: 0.10.0
>
>
> When running the DeltaStreamer along with a second spark datasource writer (with [ZK-based OCC enabled|https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing] we receive the following exception (which haults the spark datasource writer). This occurs following warnings of timeline inconsistencies:
>  
> {code:java}
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ending with transaction owner Option{val=[==>20211007170717__commit__INFLIGHT]}
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASING lock atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO ZookeeperBasedLockProvider: RELEASED lock atZkBasePath = /events/test/mwc/v1, lock key = events_mwc_test_v1
> 21/10/07 17:10:05 INFO TransactionManager: Transaction ended
> Exception in thread "main" java.lang.IllegalArgumentException
>         at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
>         at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:414)
>         at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:395)
>         at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
>         at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:218)
>         at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:190)
>         at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
>         at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:617)
>         at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
>         at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
>         at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
>         at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>         at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>         at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
>         at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
>         at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
>         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
>         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
>         at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
>         at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
>         at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>         at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>         at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
>         at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
>         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
>         at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
>         at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
>         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
>         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
>         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
>         at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
>         at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
>         at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
>         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> {code}
> The validation at _*ValidationUtils.checkArgument*_ fails because the expected commit file was not present on DFS.
>  
>  The test setup is this:
>  * Deltastreamer, continuous mode. Small batch sizes and very fast commit times (high commit rate, every 10-30 seconds)
>  * A spark datasource writer moving flat parquet files from a source bucket into the table maintained by the deltastreamer
>  * The spark datasource is much slower than the deltastreamer so time-to-first-commit is about 2-8 minutes
> What I see happen is the deltastreamer finalizing many commits while the spark datasource is performing its write. It appears that the timeline changes so much, so fast that the spark datasource writer becomes "out of sync" in ways that it cannot recover. I see the {{Exception in thread "main" java.lang.IllegalArgumentException}} error on the first commit of the *datasource writer* every time. This appears to be a race condition when the rate-of-change of the hudi timeline is very high (due to a fast deltastreamer process). The spark datasource does not properly sync those changes in a multi-writer configuration which causes enough inconsistency to crash the job. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)