You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Salva Alcántara <sa...@gmail.com> on 2022/11/07 07:06:48 UTC

Strange issue with exactly once checkpoints and the kafka sink

I had a Flink 1.15.1 job configured with

```
execution.checkpointing.mode=`EXACTLY_ONCE`
```

that was failing with the following error
```
Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from
INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException:
Failed to commit KafkaCommittable{producerId=4521984, epoch=0,
transactionalId=}
...
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
 for configuration transactional.id: String must be non-empty
```

that happened after the first checkpoint was triggered. The strange thing
about it is that the `KafkaSinkBuilder` was used without calling
`setDeliverGuarantee`, and hence the default delivery guarantee was
expected to be used, which is `NONE` [1].

Is that even possible to start with? Shouldn't kafka transactions be
involved only when one follows [this recipe] [2]:

 * <p>One can also configure different {@link DeliveryGuarantee} by using
{@link
 * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using
{@link
 * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix
{@link
 * #setTransactionalIdPrefix(String)}.

So, in my case, without calling `setDeliverGuarantee` (nor
`setTransactionalIdPrefix`), I cannot understand why I was seeing these
errors. To avoid the problem, I temporarily changed the checkpointing
settings to

```
execution.checkpointing.mode=`AT_LEAST_ONCE`
```

but I'd like to understand what was happening.


  [1]:
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66
  [2]:
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51

FYI I've also posted this in SO here:
-
https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints

Re: Strange issue with exactly once checkpoints and the kafka sink

Posted by Salva Alcántara <sa...@gmail.com>.
As noted in the SO, it's a bit confusing to me how the `checkpointing.mode`
delivery guarantees with the ones for the different sinks, and in
particular with the kafka one.

Based on the error I had, I understand that if I use `EXACTLY_ONCE` for the
checkpoints and I indicate nothing in the kafka sink, the default guarantee
for it is overriden and/or transactions are used anyway (???).

Does the checkpointing.mode guarantee really override the default one for
kafka? If so, would something like this

```
//  setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // don't call
this, in order for the kafka sink to automatically adapt
setTransactionalIdPrefix("XYZ") // just in case transactions are required
````

make the kafka sink automatically adapt to the checkpointing.mode (that is,
use the same guarantee) or on the contrary I should explicitly set both
guarantees? E.g.,

```
execution.checkpointing.mode='EXACTLY_ONCE'`
```

plus

```
setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
setTransactionalIdPrefix("XYZ")
```

Or, for `AT_LEAST_ONCE`:

```
execution.checkpointing.mode='AT_LEAST_ONCE'`
```

plus

```
setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
// setTransactionalIdPrefix("XYZ") // don't required in this case
```

Any clarifications on this would be highly appreciated. Maybe you can point
me to the relevant code (or docs) if the interaction between those
guarantees is already well-documented.

Thanks in advance,

Salva


On Mon, Nov 7, 2022 at 8:06 AM Salva Alcántara <sa...@gmail.com>
wrote:

> I had a Flink 1.15.1 job configured with
>
> ```
> execution.checkpointing.mode=`EXACTLY_ONCE`
> ```
>
> that was failing with the following error
> ```
> Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from
> INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException:
> Failed to commit KafkaCommittable{producerId=4521984, epoch=0,
> transactionalId=}
> ...
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
>  for configuration transactional.id: String must be non-empty
> ```
>
> that happened after the first checkpoint was triggered. The strange thing
> about it is that the `KafkaSinkBuilder` was used without calling
> `setDeliverGuarantee`, and hence the default delivery guarantee was
> expected to be used, which is `NONE` [1].
>
> Is that even possible to start with? Shouldn't kafka transactions be
> involved only when one follows [this recipe] [2]:
>
>  * <p>One can also configure different {@link DeliveryGuarantee} by using
> {@link
>  * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using
> {@link
>  * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix
> {@link
>  * #setTransactionalIdPrefix(String)}.
>
> So, in my case, without calling `setDeliverGuarantee` (nor
> `setTransactionalIdPrefix`), I cannot understand why I was seeing these
> errors. To avoid the problem, I temporarily changed the checkpointing
> settings to
>
> ```
> execution.checkpointing.mode=`AT_LEAST_ONCE`
> ```
>
> but I'd like to understand what was happening.
>
>
>   [1]:
> https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66
>   [2]:
> https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51
>
> FYI I've also posted this in SO here:
> -
> https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints
>