You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Dave Hagman (Jira)" <ji...@apache.org> on 2021/08/05 16:38:00 UTC

[jira] [Commented] (HUDI-2275) HoodieDeltaStreamerException when using OCC and a second concurrent writer

    [ https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394190#comment-17394190 ] 

Dave Hagman commented on HUDI-2275:
-----------------------------------

Comment in Slack from Shiv Narayan:

 
{code:java}
here is my take(based on my minimal understanding):
Don't think this was tested and we might need a fix. deltastreamer always fetches the checkpoint from latest commit metadata. and if spark writer writes midway, those commit metadata may not have checkpoint and hence the issue. But will wait to hear from nishith who is the author of multi-writer.
{code}
 

As I continue to get acquainted with the OCC code it is (so far) supporting the above statement. It does not appear that there is any isolation at the commit level. The _*TransactionManager*_ is used to manage transactions (which itself relies on lock providers such as ZK and Hive based locks). The only usage of the txn manager that I can find is here: [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java#L395]

All that is doing is acquiring a lock before syncing table metadata. There is nothing to prevent two (or more) writers from steamrolling each other's commits.

> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
>                 Key: HUDI-2275
>                 URL: https://issues.apache.org/jira/browse/HUDI-2275
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer, Spark Integration, Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Dave Hagman
>            Priority: Critical
>
>  I am trying to utilize [Optimistic Concurrency Control|https://hudi.apache.org/docs/concurrency_control] in order to allow two writers to update a single table simultaneously. The two writers are:
>  * Writer A: Deltastreamer job consuming continuously from Kafka
>  * Writer B: A spark datasource-based writer that is consuming parquet files out of S3
>  * Table Type: Copy on Write
>  
> After a few commits from each writer the deltastreamer will fail with the following exception:
>  
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]}, Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
>  "partitionToWriteStats" : {
>  ...{code}
>  
> What appears to be happening is a lack of commit isolation between the two writers
>  Writer B (spark datasource writer) will land commits which are eventually picked up by Writer A (Delta Streamer). This is an issue because the Delta Streamer needs checkpoint information which the spark datasource of course does not include in its commits. My understanding was that OCC was built for this very purpose (among others). 
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
>  hoodie.cleaner.policy.failed.writes=LAZY
>  hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
>  hoodie.write.lock.zookeeper.url=<zk_host>
>  hoodie.write.lock.zookeeper.port=2181
>  hoodie.write.lock.zookeeper.lock_key=writer_lock
>  hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>  
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
>  .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>  .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>  .option(
>  "hoodie.write.lock.provider",
>  org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
>  )
>  .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
>  .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
>  .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
>  .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
>  * Start a deltastreamer job against some table Foo
>  * In parallel, start writing to the same table Foo using spark datasource writer
>  * Note that after a few commits from each the deltastreamer is likely to fail with the above exception when the datasource writer creates non-isolated inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on completely different spark clusters but I assumed this shouldn't be an issue since we're locking using Zookeeper



--
This message was sent by Atlassian Jira
(v8.3.4#803005)