You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Salva Alcántara <sa...@gmail.com> on 2022/11/07 07:06:48 UTC
Strange issue with exactly once checkpoints and the kafka sink
I had a Flink 1.15.1 job configured with
```
execution.checkpointing.mode=`EXACTLY_ONCE`
```
that was failing with the following error
```
Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from
INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException:
Failed to commit KafkaCommittable{producerId=4521984, epoch=0,
transactionalId=}
...
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
for configuration transactional.id: String must be non-empty
```
that happened after the first checkpoint was triggered. The strange thing
about it is that the `KafkaSinkBuilder` was used without calling
`setDeliverGuarantee`, and hence the default delivery guarantee was
expected to be used, which is `NONE` [1].
Is that even possible to start with? Shouldn't kafka transactions be
involved only when one follows [this recipe] [2]:
* <p>One can also configure different {@link DeliveryGuarantee} by using
{@link
* #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using
{@link
* DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix
{@link
* #setTransactionalIdPrefix(String)}.
So, in my case, without calling `setDeliverGuarantee` (nor
`setTransactionalIdPrefix`), I cannot understand why I was seeing these
errors. To avoid the problem, I temporarily changed the checkpointing
settings to
```
execution.checkpointing.mode=`AT_LEAST_ONCE`
```
but I'd like to understand what was happening.
[1]:
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66
[2]:
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51
FYI I've also posted this in SO here:
-
https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints
Re: Strange issue with exactly once checkpoints and the kafka sink
Posted by Salva Alcántara <sa...@gmail.com>.
As noted in the SO, it's a bit confusing to me how the `checkpointing.mode`
delivery guarantees with the ones for the different sinks, and in
particular with the kafka one.
Based on the error I had, I understand that if I use `EXACTLY_ONCE` for the
checkpoints and I indicate nothing in the kafka sink, the default guarantee
for it is overriden and/or transactions are used anyway (???).
Does the checkpointing.mode guarantee really override the default one for
kafka? If so, would something like this
```
// setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // don't call
this, in order for the kafka sink to automatically adapt
setTransactionalIdPrefix("XYZ") // just in case transactions are required
````
make the kafka sink automatically adapt to the checkpointing.mode (that is,
use the same guarantee) or on the contrary I should explicitly set both
guarantees? E.g.,
```
execution.checkpointing.mode='EXACTLY_ONCE'`
```
plus
```
setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
setTransactionalIdPrefix("XYZ")
```
Or, for `AT_LEAST_ONCE`:
```
execution.checkpointing.mode='AT_LEAST_ONCE'`
```
plus
```
setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
// setTransactionalIdPrefix("XYZ") // don't required in this case
```
Any clarifications on this would be highly appreciated. Maybe you can point
me to the relevant code (or docs) if the interaction between those
guarantees is already well-documented.
Thanks in advance,
Salva
On Mon, Nov 7, 2022 at 8:06 AM Salva Alcántara <sa...@gmail.com>
wrote:
> I had a Flink 1.15.1 job configured with
>
> ```
> execution.checkpointing.mode=`EXACTLY_ONCE`
> ```
>
> that was failing with the following error
> ```
> Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from
> INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException:
> Failed to commit KafkaCommittable{producerId=4521984, epoch=0,
> transactionalId=}
> ...
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
> for configuration transactional.id: String must be non-empty
> ```
>
> that happened after the first checkpoint was triggered. The strange thing
> about it is that the `KafkaSinkBuilder` was used without calling
> `setDeliverGuarantee`, and hence the default delivery guarantee was
> expected to be used, which is `NONE` [1].
>
> Is that even possible to start with? Shouldn't kafka transactions be
> involved only when one follows [this recipe] [2]:
>
> * <p>One can also configure different {@link DeliveryGuarantee} by using
> {@link
> * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using
> {@link
> * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix
> {@link
> * #setTransactionalIdPrefix(String)}.
>
> So, in my case, without calling `setDeliverGuarantee` (nor
> `setTransactionalIdPrefix`), I cannot understand why I was seeing these
> errors. To avoid the problem, I temporarily changed the checkpointing
> settings to
>
> ```
> execution.checkpointing.mode=`AT_LEAST_ONCE`
> ```
>
> but I'd like to understand what was happening.
>
>
> [1]:
> https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66
> [2]:
> https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51
>
> FYI I've also posted this in SO here:
> -
> https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints
>