You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Genmao Yu <hu...@gmail.com> on 2019/09/03 03:39:10 UTC

Re: [SS]Kafka EOS transaction timeout solution

Increasing `transaction.timeout.ms` in both kafka client and server side
may not be a best but workable solution. In the design, spark will submit
tasks to all executors to do kafka commit (the second phase in 2PC). This
will increase the possibility of commit failure. Besides, users may be
clear that if kafka transaction commit timeout, the exactly-once may fall
back to at-least-once.

wenxuan Guan <we...@gmail.com> 于2019年8月31日周六 下午3:42写道:

> Hi all,
>
> I have implement Structured Streaming Kafka sink EOS with Kafka
> transaction producer, and design sketch is in <a href="
> https://issues.apache.org/jira/browse/SPARK-28908">SPARK-28908</a> and pr
> in <a href="https://github.com/apache/spark/pull/25618">25618</a>. But
> now I meet a problem as blow.
>
> When producer failed to commit transaction after successfully send data to
> Kafka for some reason, such as kafka broker down, spark job will fail down.
> After kafka broker recovered, restart the job and transaction will resume.
> But if the time between transaction commit failure fixed and job restart by
> job attempt or manually exceed the config `transaction.timeout.ms`, data
> send by producer will be discard by kafka broker, leading to data loss.
>
> My solution is
> 1.Increase the config `transaction.timeout.ms`.
>  Set the config from 60 seconds, the default value of `
> transaction.timeout.ms` in producer, to 15 minutes, the default value of
> config `transaction.max.timeout.ms` in Kafka broker if user not defined.
> Because the request will fail with a InvalidTransactionTimeout error if `
> transaction.timeout.ms` is larger than `transaction.max.timeout.ms`. And
> if user defined transaction.timeout.ms`, we just check if it is larger
> enough.
> 2.Notice user the config `transaction.timeout.ms` in document, and
> introduce some solution to avoid data loss, such as increase config `
> transaction.timeout.ms` and `transaction.max.timeout.ms`, and avoid
> exceed the time.
>
> BTW, I just skimmed the code in Flink, and found by default flink set the `
> transaction.timeout.ms` property in producer config to 1 hour, and notice
> user to increase `transaction.max.timeout.ms` in doc.
>
> Any idea about how to handle this problem?
>
> Many Thanks,
> Wenxuan
>

Re: [SS]Kafka EOS transaction timeout solution

Posted by wenxuan Guan <we...@gmail.com>.
Thanks for your reply.
1. About submit tasks to commit kafka transaction
kafka producer commit transaction is lightweight action, compared with
sending message to kafka, there is little chance to fail in commit
transaction. However to implement EOS, we must consider commit transaction
fail, and we should focus on transaction commit timeout.
2. About transaction commit timeout
I agree with you to notice user. In addition to set `transaction.timeout.ms`
configure and notice user in doc, maybe we can do something to resend data
to kafka, fall back to at-least-once, when find transaction timeout, right?

Genmao Yu <hu...@gmail.com> 于2019年9月3日周二 上午11:39写道:

> Increasing `transaction.timeout.ms` in both kafka client and server side
> may not be a best but workable solution. In the design, spark will submit
> tasks to all executors to do kafka commit (the second phase in 2PC). This
> will increase the possibility of commit failure. Besides, users may be
> clear that if kafka transaction commit timeout, the exactly-once may fall
> back to at-least-once.
>
> wenxuan Guan <we...@gmail.com> 于2019年8月31日周六 下午3:42写道:
>
>> Hi all,
>>
>> I have implement Structured Streaming Kafka sink EOS with Kafka
>> transaction producer, and design sketch is in <a href="
>> https://issues.apache.org/jira/browse/SPARK-28908">SPARK-28908</a> and
>> pr in <a href="https://github.com/apache/spark/pull/25618">25618</a>.
>> But now I meet a problem as blow.
>>
>> When producer failed to commit transaction after successfully send data
>> to Kafka for some reason, such as kafka broker down, spark job will fail
>> down. After kafka broker recovered, restart the job and transaction will
>> resume. But if the time between transaction commit failure fixed and job
>> restart by job attempt or manually exceed the config `
>> transaction.timeout.ms`, data send by producer will be discard by kafka
>> broker, leading to data loss.
>>
>> My solution is
>> 1.Increase the config `transaction.timeout.ms`.
>>  Set the config from 60 seconds, the default value of `
>> transaction.timeout.ms` in producer, to 15 minutes, the default value of
>> config `transaction.max.timeout.ms` in Kafka broker if user not defined.
>> Because the request will fail with a InvalidTransactionTimeout error if `
>> transaction.timeout.ms` is larger than `transaction.max.timeout.ms`. And
>> if user defined transaction.timeout.ms`, we just check if it is larger
>> enough.
>> 2.Notice user the config `transaction.timeout.ms` in document, and
>> introduce some solution to avoid data loss, such as increase config `
>> transaction.timeout.ms` and `transaction.max.timeout.ms`, and avoid
>> exceed the time.
>>
>> BTW, I just skimmed the code in Flink, and found by default flink set the
>> `transaction.timeout.ms` property in producer config to 1 hour, and
>> notice user to increase `transaction.max.timeout.ms` in doc.
>>
>> Any idea about how to handle this problem?
>>
>> Many Thanks,
>> Wenxuan
>>
>