You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "zhangyingjie (Jira)" <ji...@apache.org> on 2022/03/11 13:25:00 UTC

[jira] [Commented] (HUDI-3058) SqlQueryEqualityPreCommitValidator errors with java.util.ConcurrentModificationException

    [ https://issues.apache.org/jira/browse/HUDI-3058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504918#comment-17504918 ] 

zhangyingjie commented on HUDI-3058:
------------------------------------

When I run two spark jobs to write to the same hudi table, I encounter the same error:

 

py4j.protocol.Py4JJavaError: An error occurred while calling o92.save.
: org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
        at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
        at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:73)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
        at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:67)
        at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:502)
        at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:196)
        at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125)
        at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:635)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
        ... 44 more

> SqlQueryEqualityPreCommitValidator errors with java.util.ConcurrentModificationException
> ----------------------------------------------------------------------------------------
>
>                 Key: HUDI-3058
>                 URL: https://issues.apache.org/jira/browse/HUDI-3058
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Usability
>    Affects Versions: 0.10.0
>            Reporter: sivabalan narayanan
>            Assignee: satish
>            Priority: Major
>              Labels: pull-request-available, sev:high
>             Fix For: 0.11.0
>
>
> Ref issue: [https://github.com/apache/hudi/issues/4109]
>  
> Faced concurrentModificationException when trying to test SqlQueryEqualityPreCommitValidator in quickstart guide
> *To Reproduce*
> Steps to reproduce the behavior:
>  # Insert data without any pre commit validations
>  # Update data (ensured the updates dont touch the fare column in quickstart example) with the following precommit validator props
> {{option("hoodie.precommit.validators", "org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator").
> option("hoodie.precommit.validators.equality.sql.queries", "select sum(fare) from <TABLE_NAME>").}}
> stacktrace:
> {code:java}
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20211124114945342
> at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
> at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
> at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:111)
> at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:95)
> at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:174)
> at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)
> at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:276)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
> at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> ... 70 elided
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1633)
> at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:743)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromPendingCommits(SparkValidatorUtils.java:159)
> at org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:78)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:401)
> at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:140)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:267)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:182)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:82)
> at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55)
> ... 98 more {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)