You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Raffaele Esposito <ra...@gmail.com> on 2020/05/19 15:21:24 UTC

KTable as a compacted topic, implications

This is the topology of a simple word count:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
      <-- KSTREAM-FILTER-0000000005

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
      --> KTABLE-TOSTREAM-0000000007
      <-- KSTREAM-SOURCE-0000000006
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
      <-- KTABLE-TOSTREAM-0000000007

I would like to understand better what (stores: [Counts]) means.

The documentation says:

For each state store, Kafka maintains a replicated changelog Kafka topic in
which it tracks any state updates.

As I understand a KTable in Kafka is implemented with an in memory table
(RockDB but in theory could also be an HashMap) backed up by a compacted
log, which is the one of the internal topic created by my sample
application.

wordcount-application-Counts-changelog

If the above assumption is correct it means that in that step Kafka Streams
is writing back to kafka and start reading again from Kafka.

So basically the log compacted topic is the sink for one read-process-write
cycle and the source for the next.

   - read from repartition topic -> write on KTable compacted topic
   - read from KTable compacted topic -> process -> write to output topic

Is this correct ?
As per fault tolerance and exaclty-once semantic I would also expect Kafka
to use apply transactions. Once again, are my assumptions correct ? Where I
could learn more about these concepts ?

Any help is welcome !

Re: KTable as a compacted topic, implications

Posted by Raffaele Esposito <ra...@gmail.com>.
Thanks a lot !


On Tue, May 19, 2020 at 10:40 PM Matthias J. Sax <mj...@apache.org> wrote:

> Yes, for EOW, writing into changelog topics happens in the same
> transaction as writing to output topic.
>
> You might be interesting in this blog post:
> https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/
>
> On 5/19/20 1:22 PM, Raffaele Esposito wrote:
> > Thanks a lot Alex and Matthias,
> > From Alex answer, I understand that the record is written to the
> compacted
> > topic
> > as part of the transaction right ?
> >
> >
> > On Tue, May 19, 2020 at 8:32 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> What Alex says is correct.
> >>
> >> The changelog topic is only written into during processing -- in fact,
> >> you could consider this write a "side effect" of doing `store.put()`.
> >>
> >> The changelog topic is only read when recovering from an error and the
> >> store needs to be rebuilt from it.
> >>
> >>
> >> -Matthias
> >>
> >> On 5/19/20 9:30 AM, Alex Craig wrote:
> >>> Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
> >>> wrong, but I don't believe anything gets read from the changelog topic.
> >>> (other than at startup if the state-store needs to be restored)  So in
> >>> your Sub-topology-1,
> >>> the only topic being consumed from is the repartition topic.  After it
> >> hits
> >>> the aggregation (state store), the record is emitted to the to the next
> >>> step which converts it back to a KStream object, and then finally it's
> >>> sinked to your output topic. If a failure occurred somewhere in that
> >>> sub-topology, then the offset for the repartition topic would not have
> >> been
> >>> committed, which means it would get processed again on application
> >>> startup.  Hope that helps,
> >>>
> >>> Alex Craig
> >>>
> >>> On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <
> >> rafaelralf90@gmail.com>
> >>> wrote:
> >>>
> >>>> This is the topology of a simple word count:
> >>>>
> >>>> Topologies:
> >>>>    Sub-topology: 0
> >>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
> >>>>       --> KSTREAM-FLATMAPVALUES-0000000001
> >>>>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
> >>>>       --> KSTREAM-KEY-SELECT-0000000002
> >>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
> >>>>       --> KSTREAM-FILTER-0000000005
> >>>>       <-- KSTREAM-FLATMAPVALUES-0000000001
> >>>>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
> >>>>       --> KSTREAM-SINK-0000000004
> >>>>       <-- KSTREAM-KEY-SELECT-0000000002
> >>>>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
> >>>>       <-- KSTREAM-FILTER-0000000005
> >>>>
> >>>>   Sub-topology: 1
> >>>>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
> >>>>       --> KSTREAM-AGGREGATE-0000000003
> >>>>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
> >>>>       --> KTABLE-TOSTREAM-0000000007
> >>>>       <-- KSTREAM-SOURCE-0000000006
> >>>>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
> >>>>       --> KSTREAM-SINK-0000000008
> >>>>       <-- KSTREAM-AGGREGATE-0000000003
> >>>>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
> >>>>       <-- KTABLE-TOSTREAM-0000000007
> >>>>
> >>>> I would like to understand better what (stores: [Counts]) means.
> >>>>
> >>>> The documentation says:
> >>>>
> >>>> For each state store, Kafka maintains a replicated changelog Kafka
> >> topic in
> >>>> which it tracks any state updates.
> >>>>
> >>>> As I understand a KTable in Kafka is implemented with an in memory
> table
> >>>> (RockDB but in theory could also be an HashMap) backed up by a
> compacted
> >>>> log, which is the one of the internal topic created by my sample
> >>>> application.
> >>>>
> >>>> wordcount-application-Counts-changelog
> >>>>
> >>>> If the above assumption is correct it means that in that step Kafka
> >> Streams
> >>>> is writing back to kafka and start reading again from Kafka.
> >>>>
> >>>> So basically the log compacted topic is the sink for one
> >> read-process-write
> >>>> cycle and the source for the next.
> >>>>
> >>>>    - read from repartition topic -> write on KTable compacted topic
> >>>>    - read from KTable compacted topic -> process -> write to output
> >> topic
> >>>>
> >>>> Is this correct ?
> >>>> As per fault tolerance and exaclty-once semantic I would also expect
> >> Kafka
> >>>> to use apply transactions. Once again, are my assumptions correct ?
> >> Where I
> >>>> could learn more about these concepts ?
> >>>>
> >>>> Any help is welcome !
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: KTable as a compacted topic, implications

Posted by "Matthias J. Sax" <mj...@apache.org>.
Yes, for EOW, writing into changelog topics happens in the same
transaction as writing to output topic.

You might be interesting in this blog post:
https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

On 5/19/20 1:22 PM, Raffaele Esposito wrote:
> Thanks a lot Alex and Matthias,
> From Alex answer, I understand that the record is written to the compacted
> topic
> as part of the transaction right ?
> 
> 
> On Tue, May 19, 2020 at 8:32 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> What Alex says is correct.
>>
>> The changelog topic is only written into during processing -- in fact,
>> you could consider this write a "side effect" of doing `store.put()`.
>>
>> The changelog topic is only read when recovering from an error and the
>> store needs to be rebuilt from it.
>>
>>
>> -Matthias
>>
>> On 5/19/20 9:30 AM, Alex Craig wrote:
>>> Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
>>> wrong, but I don't believe anything gets read from the changelog topic.
>>> (other than at startup if the state-store needs to be restored)  So in
>>> your Sub-topology-1,
>>> the only topic being consumed from is the repartition topic.  After it
>> hits
>>> the aggregation (state store), the record is emitted to the to the next
>>> step which converts it back to a KStream object, and then finally it's
>>> sinked to your output topic. If a failure occurred somewhere in that
>>> sub-topology, then the offset for the repartition topic would not have
>> been
>>> committed, which means it would get processed again on application
>>> startup.  Hope that helps,
>>>
>>> Alex Craig
>>>
>>> On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <
>> rafaelralf90@gmail.com>
>>> wrote:
>>>
>>>> This is the topology of a simple word count:
>>>>
>>>> Topologies:
>>>>    Sub-topology: 0
>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
>>>>       --> KSTREAM-FLATMAPVALUES-0000000001
>>>>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
>>>>       --> KSTREAM-KEY-SELECT-0000000002
>>>>       <-- KSTREAM-SOURCE-0000000000
>>>>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
>>>>       --> KSTREAM-FILTER-0000000005
>>>>       <-- KSTREAM-FLATMAPVALUES-0000000001
>>>>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
>>>>       --> KSTREAM-SINK-0000000004
>>>>       <-- KSTREAM-KEY-SELECT-0000000002
>>>>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
>>>>       <-- KSTREAM-FILTER-0000000005
>>>>
>>>>   Sub-topology: 1
>>>>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
>>>>       --> KSTREAM-AGGREGATE-0000000003
>>>>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
>>>>       --> KTABLE-TOSTREAM-0000000007
>>>>       <-- KSTREAM-SOURCE-0000000006
>>>>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
>>>>       --> KSTREAM-SINK-0000000008
>>>>       <-- KSTREAM-AGGREGATE-0000000003
>>>>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
>>>>       <-- KTABLE-TOSTREAM-0000000007
>>>>
>>>> I would like to understand better what (stores: [Counts]) means.
>>>>
>>>> The documentation says:
>>>>
>>>> For each state store, Kafka maintains a replicated changelog Kafka
>> topic in
>>>> which it tracks any state updates.
>>>>
>>>> As I understand a KTable in Kafka is implemented with an in memory table
>>>> (RockDB but in theory could also be an HashMap) backed up by a compacted
>>>> log, which is the one of the internal topic created by my sample
>>>> application.
>>>>
>>>> wordcount-application-Counts-changelog
>>>>
>>>> If the above assumption is correct it means that in that step Kafka
>> Streams
>>>> is writing back to kafka and start reading again from Kafka.
>>>>
>>>> So basically the log compacted topic is the sink for one
>> read-process-write
>>>> cycle and the source for the next.
>>>>
>>>>    - read from repartition topic -> write on KTable compacted topic
>>>>    - read from KTable compacted topic -> process -> write to output
>> topic
>>>>
>>>> Is this correct ?
>>>> As per fault tolerance and exaclty-once semantic I would also expect
>> Kafka
>>>> to use apply transactions. Once again, are my assumptions correct ?
>> Where I
>>>> could learn more about these concepts ?
>>>>
>>>> Any help is welcome !
>>>>
>>>
>>
>>
> 


Re: KTable as a compacted topic, implications

Posted by Raffaele Esposito <ra...@gmail.com>.
Thanks a lot Alex and Matthias,
From Alex answer, I understand that the record is written to the compacted
topic
as part of the transaction right ?


On Tue, May 19, 2020 at 8:32 PM Matthias J. Sax <mj...@apache.org> wrote:

> What Alex says is correct.
>
> The changelog topic is only written into during processing -- in fact,
> you could consider this write a "side effect" of doing `store.put()`.
>
> The changelog topic is only read when recovering from an error and the
> store needs to be rebuilt from it.
>
>
> -Matthias
>
> On 5/19/20 9:30 AM, Alex Craig wrote:
> > Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
> > wrong, but I don't believe anything gets read from the changelog topic.
> > (other than at startup if the state-store needs to be restored)  So in
> > your Sub-topology-1,
> > the only topic being consumed from is the repartition topic.  After it
> hits
> > the aggregation (state store), the record is emitted to the to the next
> > step which converts it back to a KStream object, and then finally it's
> > sinked to your output topic. If a failure occurred somewhere in that
> > sub-topology, then the offset for the repartition topic would not have
> been
> > committed, which means it would get processed again on application
> > startup.  Hope that helps,
> >
> > Alex Craig
> >
> > On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <
> rafaelralf90@gmail.com>
> > wrote:
> >
> >> This is the topology of a simple word count:
> >>
> >> Topologies:
> >>    Sub-topology: 0
> >>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
> >>       --> KSTREAM-FLATMAPVALUES-0000000001
> >>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
> >>       --> KSTREAM-KEY-SELECT-0000000002
> >>       <-- KSTREAM-SOURCE-0000000000
> >>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
> >>       --> KSTREAM-FILTER-0000000005
> >>       <-- KSTREAM-FLATMAPVALUES-0000000001
> >>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
> >>       --> KSTREAM-SINK-0000000004
> >>       <-- KSTREAM-KEY-SELECT-0000000002
> >>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
> >>       <-- KSTREAM-FILTER-0000000005
> >>
> >>   Sub-topology: 1
> >>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
> >>       --> KSTREAM-AGGREGATE-0000000003
> >>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
> >>       --> KTABLE-TOSTREAM-0000000007
> >>       <-- KSTREAM-SOURCE-0000000006
> >>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
> >>       --> KSTREAM-SINK-0000000008
> >>       <-- KSTREAM-AGGREGATE-0000000003
> >>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
> >>       <-- KTABLE-TOSTREAM-0000000007
> >>
> >> I would like to understand better what (stores: [Counts]) means.
> >>
> >> The documentation says:
> >>
> >> For each state store, Kafka maintains a replicated changelog Kafka
> topic in
> >> which it tracks any state updates.
> >>
> >> As I understand a KTable in Kafka is implemented with an in memory table
> >> (RockDB but in theory could also be an HashMap) backed up by a compacted
> >> log, which is the one of the internal topic created by my sample
> >> application.
> >>
> >> wordcount-application-Counts-changelog
> >>
> >> If the above assumption is correct it means that in that step Kafka
> Streams
> >> is writing back to kafka and start reading again from Kafka.
> >>
> >> So basically the log compacted topic is the sink for one
> read-process-write
> >> cycle and the source for the next.
> >>
> >>    - read from repartition topic -> write on KTable compacted topic
> >>    - read from KTable compacted topic -> process -> write to output
> topic
> >>
> >> Is this correct ?
> >> As per fault tolerance and exaclty-once semantic I would also expect
> Kafka
> >> to use apply transactions. Once again, are my assumptions correct ?
> Where I
> >> could learn more about these concepts ?
> >>
> >> Any help is welcome !
> >>
> >
>
>

Re: KTable as a compacted topic, implications

Posted by "Matthias J. Sax" <mj...@apache.org>.
What Alex says is correct.

The changelog topic is only written into during processing -- in fact,
you could consider this write a "side effect" of doing `store.put()`.

The changelog topic is only read when recovering from an error and the
store needs to be rebuilt from it.


-Matthias

On 5/19/20 9:30 AM, Alex Craig wrote:
> Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
> wrong, but I don't believe anything gets read from the changelog topic.
> (other than at startup if the state-store needs to be restored)  So in
> your Sub-topology-1,
> the only topic being consumed from is the repartition topic.  After it hits
> the aggregation (state store), the record is emitted to the to the next
> step which converts it back to a KStream object, and then finally it's
> sinked to your output topic. If a failure occurred somewhere in that
> sub-topology, then the offset for the repartition topic would not have been
> committed, which means it would get processed again on application
> startup.  Hope that helps,
> 
> Alex Craig
> 
> On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <ra...@gmail.com>
> wrote:
> 
>> This is the topology of a simple word count:
>>
>> Topologies:
>>    Sub-topology: 0
>>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
>>       --> KSTREAM-FLATMAPVALUES-0000000001
>>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
>>       --> KSTREAM-KEY-SELECT-0000000002
>>       <-- KSTREAM-SOURCE-0000000000
>>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
>>       --> KSTREAM-FILTER-0000000005
>>       <-- KSTREAM-FLATMAPVALUES-0000000001
>>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
>>       --> KSTREAM-SINK-0000000004
>>       <-- KSTREAM-KEY-SELECT-0000000002
>>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
>>       <-- KSTREAM-FILTER-0000000005
>>
>>   Sub-topology: 1
>>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
>>       --> KSTREAM-AGGREGATE-0000000003
>>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
>>       --> KTABLE-TOSTREAM-0000000007
>>       <-- KSTREAM-SOURCE-0000000006
>>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
>>       --> KSTREAM-SINK-0000000008
>>       <-- KSTREAM-AGGREGATE-0000000003
>>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
>>       <-- KTABLE-TOSTREAM-0000000007
>>
>> I would like to understand better what (stores: [Counts]) means.
>>
>> The documentation says:
>>
>> For each state store, Kafka maintains a replicated changelog Kafka topic in
>> which it tracks any state updates.
>>
>> As I understand a KTable in Kafka is implemented with an in memory table
>> (RockDB but in theory could also be an HashMap) backed up by a compacted
>> log, which is the one of the internal topic created by my sample
>> application.
>>
>> wordcount-application-Counts-changelog
>>
>> If the above assumption is correct it means that in that step Kafka Streams
>> is writing back to kafka and start reading again from Kafka.
>>
>> So basically the log compacted topic is the sink for one read-process-write
>> cycle and the source for the next.
>>
>>    - read from repartition topic -> write on KTable compacted topic
>>    - read from KTable compacted topic -> process -> write to output topic
>>
>> Is this correct ?
>> As per fault tolerance and exaclty-once semantic I would also expect Kafka
>> to use apply transactions. Once again, are my assumptions correct ? Where I
>> could learn more about these concepts ?
>>
>> Any help is welcome !
>>
> 


Re: KTable as a compacted topic, implications

Posted by Alex Craig <al...@gmail.com>.
Hi Raffaele, hopefully others more knowledgeable will correct me if I'm
wrong, but I don't believe anything gets read from the changelog topic.
(other than at startup if the state-store needs to be restored)  So in
your Sub-topology-1,
the only topic being consumed from is the repartition topic.  After it hits
the aggregation (state store), the record is emitted to the to the next
step which converts it back to a KStream object, and then finally it's
sinked to your output topic. If a failure occurred somewhere in that
sub-topology, then the offset for the repartition topic would not have been
committed, which means it would get processed again on application
startup.  Hope that helps,

Alex Craig

On Tue, May 19, 2020 at 10:21 AM Raffaele Esposito <ra...@gmail.com>
wrote:

> This is the topology of a simple word count:
>
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [word_count_input])
>       --> KSTREAM-FLATMAPVALUES-0000000001
>     Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
>       --> KSTREAM-KEY-SELECT-0000000002
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
>       --> KSTREAM-FILTER-0000000005
>       <-- KSTREAM-FLATMAPVALUES-0000000001
>     Processor: KSTREAM-FILTER-0000000005 (stores: [])
>       --> KSTREAM-SINK-0000000004
>       <-- KSTREAM-KEY-SELECT-0000000002
>     Sink: KSTREAM-SINK-0000000004 (topic: Counts-repartition)
>       <-- KSTREAM-FILTER-0000000005
>
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000006 (topics: [Counts-repartition])
>       --> KSTREAM-AGGREGATE-0000000003
>     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Counts])
>       --> KTABLE-TOSTREAM-0000000007
>       <-- KSTREAM-SOURCE-0000000006
>     Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
>       --> KSTREAM-SINK-0000000008
>       <-- KSTREAM-AGGREGATE-0000000003
>     Sink: KSTREAM-SINK-0000000008 (topic: word_count_output)
>       <-- KTABLE-TOSTREAM-0000000007
>
> I would like to understand better what (stores: [Counts]) means.
>
> The documentation says:
>
> For each state store, Kafka maintains a replicated changelog Kafka topic in
> which it tracks any state updates.
>
> As I understand a KTable in Kafka is implemented with an in memory table
> (RockDB but in theory could also be an HashMap) backed up by a compacted
> log, which is the one of the internal topic created by my sample
> application.
>
> wordcount-application-Counts-changelog
>
> If the above assumption is correct it means that in that step Kafka Streams
> is writing back to kafka and start reading again from Kafka.
>
> So basically the log compacted topic is the sink for one read-process-write
> cycle and the source for the next.
>
>    - read from repartition topic -> write on KTable compacted topic
>    - read from KTable compacted topic -> process -> write to output topic
>
> Is this correct ?
> As per fault tolerance and exaclty-once semantic I would also expect Kafka
> to use apply transactions. Once again, are my assumptions correct ? Where I
> could learn more about these concepts ?
>
> Any help is welcome !
>