You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2016/11/09 13:59:52 UTC

Understanding the topology of high level kafka stream

Hi,
I had some basic questions on sequence of tasks for streaming application
restart in case of failure or otherwise.

Say my stream is structured this way

source-topic
   branched into 2 kstreams
    source-topic-1
    source-topic-2
   each mapped to 2 new kstreams (new key,value pairs) backed by 2 kafka
topics
       source-topic-1-new
       source-topic-2-new
       each aggregated to new ktable backed by internal changelog topics
       source-topic-1-new-table (scource-topic-1-new-changelog)
       source-topic-2-new-table (scource-topic-2-new-changelog)
       table1 left join table2 -> to final stream
Results of final stream are then persisted into another data storage

So if you see I have following physical topics or state stores
source-topic
source-topic-1-new
source-topic-2-new
scource-topic-1-new-changelog
scource-topic-2-new-changelog

Now at a give point if the streaming application is stopped there is some
data in all these topics.
Barring the source-topic all other topic has data inserted by the streaming
application.

Also I suppose streaming application stores the offset for each of the
topic as where it was last.

So when I restart the application how does the processing starts again?
Will it pick the data from last left changelog topics and process them
first and then process the source topic data from the offset last left?

Or it will start from source topic. I really don't want it to maintain
offset to changelog tables because any old key's value can be modified as
part of aggregation again.

Bit confused here, any light would help a lot.

Thanks
Sachin

Re: Understanding the topology of high level kafka stream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

The reset tool does not have any parameter. But if you copy and modify
the code, it would be quite simple to change this behavior.

You can also always write an own tool from scratch to modify committed
offsets -- just use a consumer with the corresponding group.id, seek()
to the offsets you are interested in, and commit() them.

Afterward, when you startup the Streams application, it will pick up
those committed offsets and resume from there. If some data gets
written after your commit and before you start the application, those
data will be processed of course. Thus, if you really want to be at
"end of log" at startup, you would need to commit an invalid offset
(like Long.MAX_VALUE) and specify auto.offset.reset=latest in your
StreamsConfig.


- -Matthias


On 11/10/16 6:37 PM, Sachin Mittal wrote:
> In the 
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-res
etting-a-streams-application/
>
> 
I see following:
> seek to offset zero for all partitions of all input topics and
> commit the offset
> 
> also I see: the offsets for kept intermediate topics must be set to
> the largest value (i.e., to the current log-size) instead of zero.
> 
> Can we do the same for input topic too via this reset tool or some
> manual command. So we set it to largest value and when we restart
> we processes the latest messages from input topic too. As of now we
> are OK with some data getting lost while the streaming application
> is not running.
> 
> Thanks Sachin
> 
> 
> 
> On Fri, Nov 11, 2016 at 12:53 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> The tool resets all offsets of user input and intermediate topics
> as well as for all internal intermediate and topics.
> 
> Furthermore, internal topics (intermediate and store changelogs)
> get deleted. But NOT local stores -- for local stores you need to
> leverage KafkaStreams#cleanUp()
> 
> See also: 
> http://docs.confluent.io/current/streams/developer-guide.html#applicat
io
>
> 
n-reset-tool
> 
> 
> -Matthias
> 
> On 11/10/16 3:48 AM, Sachin Mittal wrote:
>>>> Hi, The reset tool looks like a great feature.
>>>> 
>>>> So following this link 
>>>> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-
res
>
>>>> 
etting-a-streams-application/
>>>> 
>>>> What I understand is that this tool resets the offsets for 
>>>> internal and intermediate topics and also deletes all the
>>>> internal local storage and topics. Please confirm this.
>>>> 
>>>> I was actually doing all this manually.
>>>> 
>>>> Thanks Sachin
>>>> 
>>>> 
>>>> On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax 
>>>> <ma...@confluent.io> wrote:
>>>> 
>>>> Hey,
>>>> 
>>>> changelog topics are compacted topics and no retention time
>>>> is applied (one exception are window-changelog topics though,
>>>> which have both -- compaction and retention policy enabled)
>>>> 
>>>> If an input message is purged via retention time (and this is
>>>> you latest committed offset), and you start you Stream
>>>> application, it will resume according to "auto.offset.reset"
>>>> policy what you can specify in StreamsConfig. So Streams will
>>>> just run fine, but the data is of course lost.
>>>> 
>>>> For repartitioning topics that same argument applies.
>>>> 
>>>>>>> I am asking this because I am planning to keep the
>>>>>>> retention time for internal changelog topics also small
>>>>>>> so no message gets big enough to start getting
>>>>>>> exceptions.
>>>> 
>>>> I don't understand this part though...
>>>> 
>>>> To set an arbitrary start offset there is no API or tooling 
>>>> available at the moment. However, we plan to add some of this
>>>> in future releases.
>>>> 
>>>> As for now, you could set start offsets "manually" by writing
>>>> a small consumer application, that does not process data, but
>>>> only seek() to (and commit()) the start offsets you want to
>>>> use. This is a similar idea as the Streams application reset
>>>> tool is built on. See this blog post for details:
>>>> 
>>>> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-
res
>
>>>> 
et
>>>> 
>>>> 
> ting-a-streams-application/
>>>> 
>>>> However, you should be careful to not mess with internally
>>>> kept state (ie, make sure it is still semantically meaningful
>>>> and compatible if you start to modify offsets...)
>>>> 
>>>> 
>>>> Hope this helps.
>>>> 
>>>> -Matthias
>>>> 
>>>> On 11/9/16 7:29 AM, Sachin Mittal wrote:
>>>>>>> Hi, What happens when the message itself is purged by
>>>>>>> kafka via retention time setting or something else,
>>>>>>> which was later than the last offset stored by the
>>>>>>> stream consumer.
>>>>>>> 
>>>>>>> I am asking this because I am planning to keep the
>>>>>>> retention time for internal changelog topics also small
>>>>>>> so no message gets big enough to start getting
>>>>>>> exceptions.
>>>>>>> 
>>>>>>> So if messages from last offset are deleted then will
>>>>>>> there be any issues?
>>>>>>> 
>>>>>>> Also is there anyway to control or set the offset
>>>>>>> manually when we re start the streaming application so
>>>>>>> certain old messages are not consumed at all as logic
>>>>>>> wise they are not useful to streaming application any
>>>>>>> more. Like say past users sessions created while
>>>>>>> streaming application was stopped.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks Sachin
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska 
>>>>>>> <en...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hi Sachin,
>>>>>>>> 
>>>>>>>> Kafka Streams is built on top of standard Kafka
>>>>>>>> consumers. For for every topic it consumes from
>>>>>>>> (whether changelog topic or source topic, it doesn't
>>>>>>>> matter), the consumer stores the offset it last
>>>>>>>> consumed from. Upon restart, by default it start
>>>>>>>> consuming from where it left off from each of the
>>>>>>>> topics. So you can think of it this way: a restart 
>>>>>>>> should be no different than if you had left the
>>>>>>>> application running (i.e., no restart).
>>>>>>>> 
>>>>>>>> Thanks Eno
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On 9 Nov 2016, at 13:59, Sachin Mittal 
>>>>>>>>> <sj...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi, I had some basic questions on sequence of tasks
>>>>>>>>> for streaming application restart in case of
>>>>>>>>> failure or otherwise.
>>>>>>>>> 
>>>>>>>>> Say my stream is structured this way
>>>>>>>>> 
>>>>>>>>> source-topic branched into 2 kstreams
>>>>>>>>> source-topic-1 source-topic-2 each mapped to 2 new
>>>>>>>>> kstreams (new key,value pairs) backed by 2 kafka
>>>>>>>>> topics source-topic-1-new source-topic-2-new each
>>>>>>>>> aggregated to new ktable backed by internal
>>>>>>>>> changelog topics source-topic-1-new-table
>>>>>>>>> (scource-topic-1-new-changelog) 
>>>>>>>>> source-topic-2-new-table
>>>>>>>>> (scource-topic-2-new-changelog) table1 left join
>>>>>>>>> table2 -> to final stream Results of final stream
>>>>>>>>> are then persisted into another data storage
>>>>>>>>> 
>>>>>>>>> So if you see I have following physical topics or
>>>>>>>>> state stores source-topic source-topic-1-new 
>>>>>>>>> source-topic-2-new scource-topic-1-new-changelog 
>>>>>>>>> scource-topic-2-new-changelog
>>>>>>>>> 
>>>>>>>>> Now at a give point if the streaming application
>>>>>>>>> is stopped there is some data in all these topics.
>>>>>>>>> Barring the source-topic all other topic has data
>>>>>>>>> inserted by the
>>>>>>>> streaming
>>>>>>>>> application.
>>>>>>>>> 
>>>>>>>>> Also I suppose streaming application stores the
>>>>>>>>> offset for each of the topic as where it was last.
>>>>>>>>> 
>>>>>>>>> So when I restart the application how does the 
>>>>>>>>> processing starts again? Will it pick the data from
>>>>>>>>> last left changelog topics and process them first
>>>>>>>>> and then process the source topic data from the
>>>>>>>>> offset last left?
>>>>>>>>> 
>>>>>>>>> Or it will start from source topic. I really don't
>>>>>>>>> want it to maintain offset to changelog tables
>>>>>>>>> because any old key's value can be modified as part
>>>>>>>>> of aggregation again.
>>>>>>>>> 
>>>>>>>>> Bit confused here, any light would help a lot.
>>>>>>>>> 
>>>>>>>>> Thanks Sachin
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJXBdAAoJECnhiMLycopPdWkQAJhVGp53K3uiSK1RTg4ZaCv3
FqmC5oR4bujZJtPlltQGkZjvSCZ0alnDeepN2xssAH036tOiYm64ordmH5nxfmHt
X1NbyLN77HD+EVbwiE8WQDieV9XIJ8yUvmwv0RX8JOLus/6/vVNtlpwilZfiKSwK
qIJ5vPQ+LBOKX6rO1R8Vw2I1nDooQhlOQbtR3HD9oIY4ySkwEd1i/mhdXCJVCH9D
M2cWtbJzyFlMD4cLVmCsom+6vqxb7T0HaZtOTarwLkDEeQg473Fqifn/fs08MMgw
MxmNgPzKjqLDCjDXJgY0h1frA9zv0gfJoK7RA2HY9DPC2DokbgmLhye2Kbz0GZZ7
DYc1U4YT5EbtQXCPb4DYYUzfcS7ONjzBz62xVsJ0edE2ZbeXVIz+VWpqW3pSrBlG
rS0kM+SIhOdBZbnt3P+Wg7OxRESBMmkaXiewvfkT8fsusUfhSetkM2TWOzlNa+cc
AN0PDRpWMYjxEasDR61C9DLdQ7C8r00jBKRpYAmiVFNDAi+m/B8TS8vr8/MjXl/v
niP872HBu69eofsgcNJVBGc1oJhzdSZBm2+efxVSO4rHT1T6Nh9NFI0pun3MQKgX
ovxzL5DKOJ5/alZ66o49P7KXa/C7R2oeW+8i/oLusbIs+J7fjJrXSYS6pAYOmlp8
bL9r1hwujlgrTleWY2h9
=4nBr
-----END PGP SIGNATURE-----

Re: Understanding the topology of high level kafka stream

Posted by Sachin Mittal <sj...@gmail.com>.
In the
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
I see following:
seek to offset zero for all partitions of all input topics and commit the
offset

also I see:
the offsets for kept intermediate topics must be set to the largest value
(i.e., to the current log-size) instead of zero.

Can we do the same for input topic too via this reset tool or some manual
command. So we set it to largest value and when we restart we processes the
latest messages from input topic too.
As of now we are OK with some data getting lost while the streaming
application is not running.

Thanks
Sachin



On Fri, Nov 11, 2016 at 12:53 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> The tool resets all offsets of user input and intermediate topics as
> well as for all internal intermediate and topics.
>
> Furthermore, internal topics (intermediate and store changelogs) get
> deleted. But NOT local stores -- for local stores you need to leverage
> KafkaStreams#cleanUp()
>
> See also:
> http://docs.confluent.io/current/streams/developer-guide.html#applicatio
> n-reset-tool
>
>
> - -Matthias
>
> On 11/10/16 3:48 AM, Sachin Mittal wrote:
> > Hi, The reset tool looks like a great feature.
> >
> > So following this link
> > https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-res
> etting-a-streams-application/
> >
> >  What I understand is that this tool resets the offsets for
> > internal and intermediate topics and also deletes all the internal
> > local storage and topics. Please confirm this.
> >
> > I was actually doing all this manually.
> >
> > Thanks Sachin
> >
> >
> > On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > Hey,
> >
> > changelog topics are compacted topics and no retention time is
> > applied (one exception are window-changelog topics though, which
> > have both -- compaction and retention policy enabled)
> >
> > If an input message is purged via retention time (and this is you
> > latest committed offset), and you start you Stream application, it
> > will resume according to "auto.offset.reset" policy what you can
> > specify in StreamsConfig. So Streams will just run fine, but the
> > data is of course lost.
> >
> > For repartitioning topics that same argument applies.
> >
> >>>> I am asking this because I am planning to keep the retention
> >>>> time for internal changelog topics also small so no message
> >>>> gets big enough to start getting exceptions.
> >
> > I don't understand this part though...
> >
> > To set an arbitrary start offset there is no API or tooling
> > available at the moment. However, we plan to add some of this in
> > future releases.
> >
> > As for now, you could set start offsets "manually" by writing a
> > small consumer application, that does not process data, but only
> > seek() to (and commit()) the start offsets you want to use. This is
> > a similar idea as the Streams application reset tool is built on.
> > See this blog post for details:
> >
> > https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-res
> et
> >
> >
> ting-a-streams-application/
> >
> > However, you should be careful to not mess with internally kept
> > state (ie, make sure it is still semantically meaningful and
> > compatible if you start to modify offsets...)
> >
> >
> > Hope this helps.
> >
> > -Matthias
> >
> > On 11/9/16 7:29 AM, Sachin Mittal wrote:
> >>>> Hi, What happens when the message itself is purged by kafka
> >>>> via retention time setting or something else, which was later
> >>>> than the last offset stored by the stream consumer.
> >>>>
> >>>> I am asking this because I am planning to keep the retention
> >>>> time for internal changelog topics also small so no message
> >>>> gets big enough to start getting exceptions.
> >>>>
> >>>> So if messages from last offset are deleted then will there
> >>>> be any issues?
> >>>>
> >>>> Also is there anyway to control or set the offset manually
> >>>> when we re start the streaming application so certain old
> >>>> messages are not consumed at all as logic wise they are not
> >>>> useful to streaming application any more. Like say past users
> >>>> sessions created while streaming application was stopped.
> >>>>
> >>>>
> >>>> Thanks Sachin
> >>>>
> >>>>
> >>>> On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska
> >>>> <en...@gmail.com> wrote:
> >>>>
> >>>>> Hi Sachin,
> >>>>>
> >>>>> Kafka Streams is built on top of standard Kafka consumers.
> >>>>> For for every topic it consumes from (whether changelog
> >>>>> topic or source topic, it doesn't matter), the consumer
> >>>>> stores the offset it last consumed from. Upon restart, by
> >>>>> default it start consuming from where it left off from each
> >>>>> of the topics. So you can think of it this way: a restart
> >>>>> should be no different than if you had left the application
> >>>>> running (i.e., no restart).
> >>>>>
> >>>>> Thanks Eno
> >>>>>
> >>>>>
> >>>>>> On 9 Nov 2016, at 13:59, Sachin Mittal
> >>>>>> <sj...@gmail.com> wrote:
> >>>>>>
> >>>>>> Hi, I had some basic questions on sequence of tasks for
> >>>>>> streaming application restart in case of failure or
> >>>>>> otherwise.
> >>>>>>
> >>>>>> Say my stream is structured this way
> >>>>>>
> >>>>>> source-topic branched into 2 kstreams source-topic-1
> >>>>>> source-topic-2 each mapped to 2 new kstreams (new
> >>>>>> key,value pairs) backed by 2 kafka topics
> >>>>>> source-topic-1-new source-topic-2-new each aggregated to
> >>>>>> new ktable backed by internal changelog topics
> >>>>>> source-topic-1-new-table (scource-topic-1-new-changelog)
> >>>>>> source-topic-2-new-table (scource-topic-2-new-changelog)
> >>>>>> table1 left join table2 -> to final stream Results of
> >>>>>> final stream are then persisted into another data
> >>>>>> storage
> >>>>>>
> >>>>>> So if you see I have following physical topics or state
> >>>>>> stores source-topic source-topic-1-new
> >>>>>> source-topic-2-new scource-topic-1-new-changelog
> >>>>>> scource-topic-2-new-changelog
> >>>>>>
> >>>>>> Now at a give point if the streaming application is
> >>>>>> stopped there is some data in all these topics. Barring
> >>>>>> the source-topic all other topic has data inserted by
> >>>>>> the
> >>>>> streaming
> >>>>>> application.
> >>>>>>
> >>>>>> Also I suppose streaming application stores the offset
> >>>>>> for each of the topic as where it was last.
> >>>>>>
> >>>>>> So when I restart the application how does the
> >>>>>> processing starts again? Will it pick the data from last
> >>>>>> left changelog topics and process them first and then
> >>>>>> process the source topic data from the offset last left?
> >>>>>>
> >>>>>> Or it will start from source topic. I really don't want
> >>>>>> it to maintain offset to changelog tables because any old
> >>>>>> key's value can be modified as part of aggregation
> >>>>>> again.
> >>>>>>
> >>>>>> Bit confused here, any light would help a lot.
> >>>>>>
> >>>>>> Thanks Sachin
> >>>>>
> >>>>>
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYJMkaAAoJECnhiMLycopPeOkP/1MtVk2ZSYYkZO8Ru3IWJ5cw
> 5b0s5IpyLAS10JsoYLsITpNuWffJJx6L0E25mf7WZ/nn918BuqTLl5jNivw9C9ru
> BVouCPFITVs8BRIAVkh9Vzux9/FCHS82UEDseYuWKrjyiJ0v6zVpXnrj65R//VpQ
> b4tja3ubbXRaIeoreK9cxQsO3dQdU7Yzz0pFWs1/EyaACghHGeTmwrhQqq02mzm8
> HcwbjKJHqEnWRpSXQ6AJ7ak13lgAUrefUCgOI2DK2wBN4267lwuKO9QAh0oJv6EZ
> YKjdNXqSRTNd72RfLEZauCNb0dkhRK9s3GN7BWAw0Ce2rjTsXvFP+jMfCjiNj5xp
> cd1SM4TG8QlFp6agCHr7W1E1Pcbq/OKnZ2vpQYPNn+qcAk9k5HiZ4pp7h7FUBuLZ
> yXmD8gKNwg8S1LfP6dtayo6yxuuwL/CdquzsE3Hi1q8H0C/ZaWz8eiMjd1gM/9Da
> D9VZAWSWy7lwQfeyg1vxC7Q/glqF6qOY5kXd40usOt+LNw9CTNXyrAn+n8lVqCKR
> SlrRHdnJM5BUu03KmN613dsPJb9XaEcJKJ0tYGENdvZgjWt4jmIYDPBKTiqHKp8c
> kJjUjRAHx37El/wLDDsZ2jKMGdG1QwKZ+EzppFMEpVdcf679okqyRNY4sbRA/E8S
> mpT1ankVjOZaaPfPcEaw
> =9wTX
> -----END PGP SIGNATURE-----
>

Re: Understanding the topology of high level kafka stream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

The tool resets all offsets of user input and intermediate topics as
well as for all internal intermediate and topics.

Furthermore, internal topics (intermediate and store changelogs) get
deleted. But NOT local stores -- for local stores you need to leverage
KafkaStreams#cleanUp()

See also:
http://docs.confluent.io/current/streams/developer-guide.html#applicatio
n-reset-tool


- -Matthias

On 11/10/16 3:48 AM, Sachin Mittal wrote:
> Hi, The reset tool looks like a great feature.
> 
> So following this link 
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-res
etting-a-streams-application/
>
>  What I understand is that this tool resets the offsets for
> internal and intermediate topics and also deletes all the internal
> local storage and topics. Please confirm this.
> 
> I was actually doing all this manually.
> 
> Thanks Sachin
> 
> 
> On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> Hey,
> 
> changelog topics are compacted topics and no retention time is
> applied (one exception are window-changelog topics though, which
> have both -- compaction and retention policy enabled)
> 
> If an input message is purged via retention time (and this is you 
> latest committed offset), and you start you Stream application, it 
> will resume according to "auto.offset.reset" policy what you can 
> specify in StreamsConfig. So Streams will just run fine, but the
> data is of course lost.
> 
> For repartitioning topics that same argument applies.
> 
>>>> I am asking this because I am planning to keep the retention
>>>> time for internal changelog topics also small so no message
>>>> gets big enough to start getting exceptions.
> 
> I don't understand this part though...
> 
> To set an arbitrary start offset there is no API or tooling
> available at the moment. However, we plan to add some of this in
> future releases.
> 
> As for now, you could set start offsets "manually" by writing a
> small consumer application, that does not process data, but only
> seek() to (and commit()) the start offsets you want to use. This is
> a similar idea as the Streams application reset tool is built on.
> See this blog post for details:
> 
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-res
et
>
> 
ting-a-streams-application/
> 
> However, you should be careful to not mess with internally kept
> state (ie, make sure it is still semantically meaningful and
> compatible if you start to modify offsets...)
> 
> 
> Hope this helps.
> 
> -Matthias
> 
> On 11/9/16 7:29 AM, Sachin Mittal wrote:
>>>> Hi, What happens when the message itself is purged by kafka
>>>> via retention time setting or something else, which was later
>>>> than the last offset stored by the stream consumer.
>>>> 
>>>> I am asking this because I am planning to keep the retention
>>>> time for internal changelog topics also small so no message
>>>> gets big enough to start getting exceptions.
>>>> 
>>>> So if messages from last offset are deleted then will there
>>>> be any issues?
>>>> 
>>>> Also is there anyway to control or set the offset manually
>>>> when we re start the streaming application so certain old
>>>> messages are not consumed at all as logic wise they are not
>>>> useful to streaming application any more. Like say past users
>>>> sessions created while streaming application was stopped.
>>>> 
>>>> 
>>>> Thanks Sachin
>>>> 
>>>> 
>>>> On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska 
>>>> <en...@gmail.com> wrote:
>>>> 
>>>>> Hi Sachin,
>>>>> 
>>>>> Kafka Streams is built on top of standard Kafka consumers.
>>>>> For for every topic it consumes from (whether changelog
>>>>> topic or source topic, it doesn't matter), the consumer
>>>>> stores the offset it last consumed from. Upon restart, by
>>>>> default it start consuming from where it left off from each
>>>>> of the topics. So you can think of it this way: a restart
>>>>> should be no different than if you had left the application
>>>>> running (i.e., no restart).
>>>>> 
>>>>> Thanks Eno
>>>>> 
>>>>> 
>>>>>> On 9 Nov 2016, at 13:59, Sachin Mittal
>>>>>> <sj...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi, I had some basic questions on sequence of tasks for 
>>>>>> streaming application restart in case of failure or
>>>>>> otherwise.
>>>>>> 
>>>>>> Say my stream is structured this way
>>>>>> 
>>>>>> source-topic branched into 2 kstreams source-topic-1 
>>>>>> source-topic-2 each mapped to 2 new kstreams (new
>>>>>> key,value pairs) backed by 2 kafka topics
>>>>>> source-topic-1-new source-topic-2-new each aggregated to
>>>>>> new ktable backed by internal changelog topics
>>>>>> source-topic-1-new-table (scource-topic-1-new-changelog)
>>>>>> source-topic-2-new-table (scource-topic-2-new-changelog)
>>>>>> table1 left join table2 -> to final stream Results of
>>>>>> final stream are then persisted into another data
>>>>>> storage
>>>>>> 
>>>>>> So if you see I have following physical topics or state
>>>>>> stores source-topic source-topic-1-new
>>>>>> source-topic-2-new scource-topic-1-new-changelog
>>>>>> scource-topic-2-new-changelog
>>>>>> 
>>>>>> Now at a give point if the streaming application is
>>>>>> stopped there is some data in all these topics. Barring
>>>>>> the source-topic all other topic has data inserted by
>>>>>> the
>>>>> streaming
>>>>>> application.
>>>>>> 
>>>>>> Also I suppose streaming application stores the offset
>>>>>> for each of the topic as where it was last.
>>>>>> 
>>>>>> So when I restart the application how does the
>>>>>> processing starts again? Will it pick the data from last
>>>>>> left changelog topics and process them first and then
>>>>>> process the source topic data from the offset last left?
>>>>>> 
>>>>>> Or it will start from source topic. I really don't want
>>>>>> it to maintain offset to changelog tables because any old
>>>>>> key's value can be modified as part of aggregation
>>>>>> again.
>>>>>> 
>>>>>> Bit confused here, any light would help a lot.
>>>>>> 
>>>>>> Thanks Sachin
>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYJMkaAAoJECnhiMLycopPeOkP/1MtVk2ZSYYkZO8Ru3IWJ5cw
5b0s5IpyLAS10JsoYLsITpNuWffJJx6L0E25mf7WZ/nn918BuqTLl5jNivw9C9ru
BVouCPFITVs8BRIAVkh9Vzux9/FCHS82UEDseYuWKrjyiJ0v6zVpXnrj65R//VpQ
b4tja3ubbXRaIeoreK9cxQsO3dQdU7Yzz0pFWs1/EyaACghHGeTmwrhQqq02mzm8
HcwbjKJHqEnWRpSXQ6AJ7ak13lgAUrefUCgOI2DK2wBN4267lwuKO9QAh0oJv6EZ
YKjdNXqSRTNd72RfLEZauCNb0dkhRK9s3GN7BWAw0Ce2rjTsXvFP+jMfCjiNj5xp
cd1SM4TG8QlFp6agCHr7W1E1Pcbq/OKnZ2vpQYPNn+qcAk9k5HiZ4pp7h7FUBuLZ
yXmD8gKNwg8S1LfP6dtayo6yxuuwL/CdquzsE3Hi1q8H0C/ZaWz8eiMjd1gM/9Da
D9VZAWSWy7lwQfeyg1vxC7Q/glqF6qOY5kXd40usOt+LNw9CTNXyrAn+n8lVqCKR
SlrRHdnJM5BUu03KmN613dsPJb9XaEcJKJ0tYGENdvZgjWt4jmIYDPBKTiqHKp8c
kJjUjRAHx37El/wLDDsZ2jKMGdG1QwKZ+EzppFMEpVdcf679okqyRNY4sbRA/E8S
mpT1ankVjOZaaPfPcEaw
=9wTX
-----END PGP SIGNATURE-----

Re: Understanding the topology of high level kafka stream

Posted by Damian Guy <da...@gmail.com>.
That is correct

On Thu, 10 Nov 2016 at 11:50 Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> The reset tool looks like a great feature.
>
> So following this link
>
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
>
> What I understand is that this tool resets the offsets for internal and
> intermediate topics and also deletes all the internal local storage and
> topics.
> Please confirm this.
>
> I was actually doing all this manually.
>
> Thanks
> Sachin
>
>
> On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Hey,
> >
> > changelog topics are compacted topics and no retention time is applied
> > (one exception are window-changelog topics though, which have both --
> > compaction and retention policy enabled)
> >
> > If an input message is purged via retention time (and this is you
> > latest committed offset), and you start you Stream application, it
> > will resume according to "auto.offset.reset" policy what you can
> > specify in StreamsConfig. So Streams will just run fine, but the data
> > is of course lost.
> >
> > For repartitioning topics that same argument applies.
> >
> > > I am asking this because I am planning to keep the retention time
> > > for internal changelog topics also small so no message gets big
> > > enough to start getting exceptions.
> >
> > I don't understand this part though...
> >
> > To set an arbitrary start offset there is no API or tooling available
> > at the moment. However, we plan to add some of this in future releases.
> >
> > As for now, you could set start offsets "manually" by writing a small
> > consumer application, that does not process data, but only seek() to
> > (and commit()) the start offsets you want to use. This is a similar
> > idea as the Streams application reset tool is built on. See this blog
> > post for details:
> >
> > https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset
> > ting-a-streams-application/
> >
> > However, you should be careful to not mess with internally kept state
> > (ie, make sure it is still semantically meaningful and compatible if
> > you start to modify offsets...)
> >
> >
> > Hope this helps.
> >
> > - -Matthias
> >
> > On 11/9/16 7:29 AM, Sachin Mittal wrote:
> > > Hi, What happens when the message itself is purged by kafka via
> > > retention time setting or something else, which was later than the
> > > last offset stored by the stream consumer.
> > >
> > > I am asking this because I am planning to keep the retention time
> > > for internal changelog topics also small so no message gets big
> > > enough to start getting exceptions.
> > >
> > > So if messages from last offset are deleted then will there be any
> > > issues?
> > >
> > > Also is there anyway to control or set the offset manually when we
> > > re start the streaming application so certain old messages are not
> > > consumed at all as logic wise they are not useful to streaming
> > > application any more. Like say past users sessions created while
> > > streaming application was stopped.
> > >
> > >
> > > Thanks Sachin
> > >
> > >
> > > On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska
> > > <en...@gmail.com> wrote:
> > >
> > >> Hi Sachin,
> > >>
> > >> Kafka Streams is built on top of standard Kafka consumers. For
> > >> for every topic it consumes from (whether changelog topic or
> > >> source topic, it doesn't matter), the consumer stores the offset
> > >> it last consumed from. Upon restart, by default it start
> > >> consuming from where it left off from each of the topics. So you
> > >> can think of it this way: a restart should be no different than
> > >> if you had left the application running (i.e., no restart).
> > >>
> > >> Thanks Eno
> > >>
> > >>
> > >>> On 9 Nov 2016, at 13:59, Sachin Mittal <sj...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> Hi, I had some basic questions on sequence of tasks for
> > >>> streaming application restart in case of failure or otherwise.
> > >>>
> > >>> Say my stream is structured this way
> > >>>
> > >>> source-topic branched into 2 kstreams source-topic-1
> > >>> source-topic-2 each mapped to 2 new kstreams (new key,value
> > >>> pairs) backed by 2 kafka topics source-topic-1-new
> > >>> source-topic-2-new each aggregated to new ktable backed by
> > >>> internal changelog topics source-topic-1-new-table
> > >>> (scource-topic-1-new-changelog) source-topic-2-new-table
> > >>> (scource-topic-2-new-changelog) table1 left join table2 -> to
> > >>> final stream Results of final stream are then persisted into
> > >>> another data storage
> > >>>
> > >>> So if you see I have following physical topics or state stores
> > >>> source-topic source-topic-1-new source-topic-2-new
> > >>> scource-topic-1-new-changelog scource-topic-2-new-changelog
> > >>>
> > >>> Now at a give point if the streaming application is stopped
> > >>> there is some data in all these topics. Barring the
> > >>> source-topic all other topic has data inserted by the
> > >> streaming
> > >>> application.
> > >>>
> > >>> Also I suppose streaming application stores the offset for each
> > >>> of the topic as where it was last.
> > >>>
> > >>> So when I restart the application how does the processing
> > >>> starts again? Will it pick the data from last left changelog
> > >>> topics and process them first and then process the source topic
> > >>> data from the offset last left?
> > >>>
> > >>> Or it will start from source topic. I really don't want it to
> > >>> maintain offset to changelog tables because any old key's value
> > >>> can be modified as part of aggregation again.
> > >>>
> > >>> Bit confused here, any light would help a lot.
> > >>>
> > >>> Thanks Sachin
> > >>
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l
> > +QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr
> > /T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1
> > MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC
> > Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB
> > GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys
> > CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m
> > w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T
> > 485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b
> > IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/
> > yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M
> > udwtipE7kAwoMN0vgQnr
> > =QX8B
> > -----END PGP SIGNATURE-----
> >
>

Re: Understanding the topology of high level kafka stream

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
The reset tool looks like a great feature.

So following this link
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

What I understand is that this tool resets the offsets for internal and
intermediate topics and also deletes all the internal local storage and
topics.
Please confirm this.

I was actually doing all this manually.

Thanks
Sachin


On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hey,
>
> changelog topics are compacted topics and no retention time is applied
> (one exception are window-changelog topics though, which have both --
> compaction and retention policy enabled)
>
> If an input message is purged via retention time (and this is you
> latest committed offset), and you start you Stream application, it
> will resume according to "auto.offset.reset" policy what you can
> specify in StreamsConfig. So Streams will just run fine, but the data
> is of course lost.
>
> For repartitioning topics that same argument applies.
>
> > I am asking this because I am planning to keep the retention time
> > for internal changelog topics also small so no message gets big
> > enough to start getting exceptions.
>
> I don't understand this part though...
>
> To set an arbitrary start offset there is no API or tooling available
> at the moment. However, we plan to add some of this in future releases.
>
> As for now, you could set start offsets "manually" by writing a small
> consumer application, that does not process data, but only seek() to
> (and commit()) the start offsets you want to use. This is a similar
> idea as the Streams application reset tool is built on. See this blog
> post for details:
>
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset
> ting-a-streams-application/
>
> However, you should be careful to not mess with internally kept state
> (ie, make sure it is still semantically meaningful and compatible if
> you start to modify offsets...)
>
>
> Hope this helps.
>
> - -Matthias
>
> On 11/9/16 7:29 AM, Sachin Mittal wrote:
> > Hi, What happens when the message itself is purged by kafka via
> > retention time setting or something else, which was later than the
> > last offset stored by the stream consumer.
> >
> > I am asking this because I am planning to keep the retention time
> > for internal changelog topics also small so no message gets big
> > enough to start getting exceptions.
> >
> > So if messages from last offset are deleted then will there be any
> > issues?
> >
> > Also is there anyway to control or set the offset manually when we
> > re start the streaming application so certain old messages are not
> > consumed at all as logic wise they are not useful to streaming
> > application any more. Like say past users sessions created while
> > streaming application was stopped.
> >
> >
> > Thanks Sachin
> >
> >
> > On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska
> > <en...@gmail.com> wrote:
> >
> >> Hi Sachin,
> >>
> >> Kafka Streams is built on top of standard Kafka consumers. For
> >> for every topic it consumes from (whether changelog topic or
> >> source topic, it doesn't matter), the consumer stores the offset
> >> it last consumed from. Upon restart, by default it start
> >> consuming from where it left off from each of the topics. So you
> >> can think of it this way: a restart should be no different than
> >> if you had left the application running (i.e., no restart).
> >>
> >> Thanks Eno
> >>
> >>
> >>> On 9 Nov 2016, at 13:59, Sachin Mittal <sj...@gmail.com>
> >>> wrote:
> >>>
> >>> Hi, I had some basic questions on sequence of tasks for
> >>> streaming application restart in case of failure or otherwise.
> >>>
> >>> Say my stream is structured this way
> >>>
> >>> source-topic branched into 2 kstreams source-topic-1
> >>> source-topic-2 each mapped to 2 new kstreams (new key,value
> >>> pairs) backed by 2 kafka topics source-topic-1-new
> >>> source-topic-2-new each aggregated to new ktable backed by
> >>> internal changelog topics source-topic-1-new-table
> >>> (scource-topic-1-new-changelog) source-topic-2-new-table
> >>> (scource-topic-2-new-changelog) table1 left join table2 -> to
> >>> final stream Results of final stream are then persisted into
> >>> another data storage
> >>>
> >>> So if you see I have following physical topics or state stores
> >>> source-topic source-topic-1-new source-topic-2-new
> >>> scource-topic-1-new-changelog scource-topic-2-new-changelog
> >>>
> >>> Now at a give point if the streaming application is stopped
> >>> there is some data in all these topics. Barring the
> >>> source-topic all other topic has data inserted by the
> >> streaming
> >>> application.
> >>>
> >>> Also I suppose streaming application stores the offset for each
> >>> of the topic as where it was last.
> >>>
> >>> So when I restart the application how does the processing
> >>> starts again? Will it pick the data from last left changelog
> >>> topics and process them first and then process the source topic
> >>> data from the offset last left?
> >>>
> >>> Or it will start from source topic. I really don't want it to
> >>> maintain offset to changelog tables because any old key's value
> >>> can be modified as part of aggregation again.
> >>>
> >>> Bit confused here, any light would help a lot.
> >>>
> >>> Thanks Sachin
> >>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l
> +QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr
> /T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1
> MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC
> Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB
> GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys
> CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m
> w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T
> 485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b
> IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/
> yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M
> udwtipE7kAwoMN0vgQnr
> =QX8B
> -----END PGP SIGNATURE-----
>

Re: Understanding the topology of high level kafka stream

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hey,

changelog topics are compacted topics and no retention time is applied
(one exception are window-changelog topics though, which have both --
compaction and retention policy enabled)

If an input message is purged via retention time (and this is you
latest committed offset), and you start you Stream application, it
will resume according to "auto.offset.reset" policy what you can
specify in StreamsConfig. So Streams will just run fine, but the data
is of course lost.

For repartitioning topics that same argument applies.

> I am asking this because I am planning to keep the retention time
> for internal changelog topics also small so no message gets big
> enough to start getting exceptions.

I don't understand this part though...

To set an arbitrary start offset there is no API or tooling available
at the moment. However, we plan to add some of this in future releases.

As for now, you could set start offsets "manually" by writing a small
consumer application, that does not process data, but only seek() to
(and commit()) the start offsets you want to use. This is a similar
idea as the Streams application reset tool is built on. See this blog
post for details:

https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset
ting-a-streams-application/

However, you should be careful to not mess with internally kept state
(ie, make sure it is still semantically meaningful and compatible if
you start to modify offsets...)


Hope this helps.

- -Matthias

On 11/9/16 7:29 AM, Sachin Mittal wrote:
> Hi, What happens when the message itself is purged by kafka via
> retention time setting or something else, which was later than the
> last offset stored by the stream consumer.
> 
> I am asking this because I am planning to keep the retention time
> for internal changelog topics also small so no message gets big
> enough to start getting exceptions.
> 
> So if messages from last offset are deleted then will there be any
> issues?
> 
> Also is there anyway to control or set the offset manually when we
> re start the streaming application so certain old messages are not
> consumed at all as logic wise they are not useful to streaming
> application any more. Like say past users sessions created while
> streaming application was stopped.
> 
> 
> Thanks Sachin
> 
> 
> On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska
> <en...@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> Kafka Streams is built on top of standard Kafka consumers. For
>> for every topic it consumes from (whether changelog topic or
>> source topic, it doesn't matter), the consumer stores the offset
>> it last consumed from. Upon restart, by default it start
>> consuming from where it left off from each of the topics. So you
>> can think of it this way: a restart should be no different than
>> if you had left the application running (i.e., no restart).
>> 
>> Thanks Eno
>> 
>> 
>>> On 9 Nov 2016, at 13:59, Sachin Mittal <sj...@gmail.com>
>>> wrote:
>>> 
>>> Hi, I had some basic questions on sequence of tasks for
>>> streaming application restart in case of failure or otherwise.
>>> 
>>> Say my stream is structured this way
>>> 
>>> source-topic branched into 2 kstreams source-topic-1 
>>> source-topic-2 each mapped to 2 new kstreams (new key,value
>>> pairs) backed by 2 kafka topics source-topic-1-new 
>>> source-topic-2-new each aggregated to new ktable backed by
>>> internal changelog topics source-topic-1-new-table
>>> (scource-topic-1-new-changelog) source-topic-2-new-table
>>> (scource-topic-2-new-changelog) table1 left join table2 -> to
>>> final stream Results of final stream are then persisted into
>>> another data storage
>>> 
>>> So if you see I have following physical topics or state stores 
>>> source-topic source-topic-1-new source-topic-2-new 
>>> scource-topic-1-new-changelog scource-topic-2-new-changelog
>>> 
>>> Now at a give point if the streaming application is stopped
>>> there is some data in all these topics. Barring the
>>> source-topic all other topic has data inserted by the
>> streaming
>>> application.
>>> 
>>> Also I suppose streaming application stores the offset for each
>>> of the topic as where it was last.
>>> 
>>> So when I restart the application how does the processing
>>> starts again? Will it pick the data from last left changelog
>>> topics and process them first and then process the source topic
>>> data from the offset last left?
>>> 
>>> Or it will start from source topic. I really don't want it to
>>> maintain offset to changelog tables because any old key's value
>>> can be modified as part of aggregation again.
>>> 
>>> Bit confused here, any light would help a lot.
>>> 
>>> Thanks Sachin
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l
+QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr
/T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1
MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC
Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB
GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys
CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m
w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T
485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b
IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/
yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M
udwtipE7kAwoMN0vgQnr
=QX8B
-----END PGP SIGNATURE-----

Re: Understanding the topology of high level kafka stream

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
What happens when the message itself is purged by kafka via retention time
setting or something else, which was later than the last offset stored by
the stream consumer.

I am asking this because I am planning to keep the retention time for
internal changelog topics also small so no message gets big enough to start
getting exceptions.

So if messages from last offset are deleted then will there be any issues?

Also is there anyway to control or set the offset manually when we re start
the streaming application so certain old messages are not consumed at all
as logic wise they are not useful to streaming application any more. Like
say past users sessions created while streaming application was stopped.


Thanks
Sachin


On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> Kafka Streams is built on top of standard Kafka consumers. For for every
> topic it consumes from (whether changelog topic or source topic, it doesn't
> matter), the consumer stores the offset it last consumed from. Upon
> restart, by default it start consuming from where it left off from each of
> the topics. So you can think of it this way: a restart should be no
> different than if you had left the application running (i.e., no restart).
>
> Thanks
> Eno
>
>
> > On 9 Nov 2016, at 13:59, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi,
> > I had some basic questions on sequence of tasks for streaming application
> > restart in case of failure or otherwise.
> >
> > Say my stream is structured this way
> >
> > source-topic
> >   branched into 2 kstreams
> >    source-topic-1
> >    source-topic-2
> >   each mapped to 2 new kstreams (new key,value pairs) backed by 2 kafka
> > topics
> >       source-topic-1-new
> >       source-topic-2-new
> >       each aggregated to new ktable backed by internal changelog topics
> >       source-topic-1-new-table (scource-topic-1-new-changelog)
> >       source-topic-2-new-table (scource-topic-2-new-changelog)
> >       table1 left join table2 -> to final stream
> > Results of final stream are then persisted into another data storage
> >
> > So if you see I have following physical topics or state stores
> > source-topic
> > source-topic-1-new
> > source-topic-2-new
> > scource-topic-1-new-changelog
> > scource-topic-2-new-changelog
> >
> > Now at a give point if the streaming application is stopped there is some
> > data in all these topics.
> > Barring the source-topic all other topic has data inserted by the
> streaming
> > application.
> >
> > Also I suppose streaming application stores the offset for each of the
> > topic as where it was last.
> >
> > So when I restart the application how does the processing starts again?
> > Will it pick the data from last left changelog topics and process them
> > first and then process the source topic data from the offset last left?
> >
> > Or it will start from source topic. I really don't want it to maintain
> > offset to changelog tables because any old key's value can be modified as
> > part of aggregation again.
> >
> > Bit confused here, any light would help a lot.
> >
> > Thanks
> > Sachin
>
>

Re: Understanding the topology of high level kafka stream

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

Kafka Streams is built on top of standard Kafka consumers. For for every topic it consumes from (whether changelog topic or source topic, it doesn't matter), the consumer stores the offset it last consumed from. Upon restart, by default it start consuming from where it left off from each of the topics. So you can think of it this way: a restart should be no different than if you had left the application running (i.e., no restart). 

Thanks
Eno


> On 9 Nov 2016, at 13:59, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> I had some basic questions on sequence of tasks for streaming application
> restart in case of failure or otherwise.
> 
> Say my stream is structured this way
> 
> source-topic
>   branched into 2 kstreams
>    source-topic-1
>    source-topic-2
>   each mapped to 2 new kstreams (new key,value pairs) backed by 2 kafka
> topics
>       source-topic-1-new
>       source-topic-2-new
>       each aggregated to new ktable backed by internal changelog topics
>       source-topic-1-new-table (scource-topic-1-new-changelog)
>       source-topic-2-new-table (scource-topic-2-new-changelog)
>       table1 left join table2 -> to final stream
> Results of final stream are then persisted into another data storage
> 
> So if you see I have following physical topics or state stores
> source-topic
> source-topic-1-new
> source-topic-2-new
> scource-topic-1-new-changelog
> scource-topic-2-new-changelog
> 
> Now at a give point if the streaming application is stopped there is some
> data in all these topics.
> Barring the source-topic all other topic has data inserted by the streaming
> application.
> 
> Also I suppose streaming application stores the offset for each of the
> topic as where it was last.
> 
> So when I restart the application how does the processing starts again?
> Will it pick the data from last left changelog topics and process them
> first and then process the source topic data from the offset last left?
> 
> Or it will start from source topic. I really don't want it to maintain
> offset to changelog tables because any old key's value can be modified as
> part of aggregation again.
> 
> Bit confused here, any light would help a lot.
> 
> Thanks
> Sachin