You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Kwon <fs...@gmail.com> on 2020/10/20 20:27:53 UTC

Some questions regarding operator IDs

Hi team

I'm subscribing 2 topics from Kafka Consumer, joining them and publishing
back to a new topic via KafkaProducer (with Exactly Once semantic)

As it's highly recommended to set uid for each operator, I'm curious how
this works. For example,

val topicASource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val topicBSource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val result = joinstream(env, topicASource, topicBSource)
  .uid("transformer")

val topicCSink = result
  .addSink(topicCProducer)
  .uid("topicCProducer")


in this code, is it necessary to set the UID of the transformer? If the
consumer offset is not committed until it finally gets published to sink,
will consumers replaying from offset from previous
checkpoint guarantee exactly once? even though transformer state is lost
when restarting?

Re: Some questions regarding operator IDs

Posted by Robert Metzger <rm...@apache.org>.
Hey Kevin,

setting the uid is not needed for exactly-once guarantees. It is used if
you want to restore the operator state manually using a savepoint.

This blog blog post (there are probably a lot more explaining this) could
be helpful to understand how the checkpointing ensures exactly once
despite failures:
https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets#:~:text=The%20Kafka%20consumer%20in%20Apache,offsets%20in%20all%20Kafka%20partitions.&text=Flink's%20checkpoint%20mechanism%20ensures%20that,on%20the%20same%20input%20data
.


On Tue, Oct 20, 2020 at 10:28 PM Kevin Kwon <fs...@gmail.com> wrote:

> Hi team
>
> I'm subscribing 2 topics from Kafka Consumer, joining them and publishing
> back to a new topic via KafkaProducer (with Exactly Once semantic)
>
> As it's highly recommended to set uid for each operator, I'm curious how
> this works. For example,
>
> val topicASource = env
>   .addSource(topicAConsumer)
>   .uid("topicAConsumer")
>
> val topicBSource = env
>   .addSource(topicAConsumer)
>   .uid("topicAConsumer")
>
> val result = joinstream(env, topicASource, topicBSource)
>   .uid("transformer")
>
> val topicCSink = result
>   .addSink(topicCProducer)
>   .uid("topicCProducer")
>
>
> in this code, is it necessary to set the UID of the transformer? If the
> consumer offset is not committed until it finally gets published to sink,
> will consumers replaying from offset from previous
> checkpoint guarantee exactly once? even though transformer state is lost
> when restarting?
>