You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Garrett Barton <ga...@gmail.com> on 2017/05/02 14:35:11 UTC

Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Greetings all,

 I have a use case where I want to calculate some metrics against sensor
data using event time semantics (record time is event time) that I already
have.  I have years of it, but for this POC I'd like to just load the last
few months so that we can start deriving trend lines now vs waiting to
consume the real-time feeds for a few months.

So the question is, what is the steps I need to take to setup kafka itself,
the topics, and streams such that I can send it say T-90 days of backlog
data as well as real-time and have it process correctly?

I have data loading into kafka 'feed' topic and I am setting the record
timestamp to the event timestamp within the data, so event time semantics
are setup from the start.
I was running into data loss when segments are deleted faster than
downstream can process.  My knee jerk reaction was to set the broker
configs log.retention.hours=2160 and log.segment.delete.delay.ms=21600000
and that made it go away, but I do not think this is right?

For examples sake, assume a source topic 'feed', assume a stream to
calculate min/max/avg to start with, using windows of 1 minute and 5
minutes.  I wish to use the interactive queries against the window stores,
and I wish to retain 90 days of window data to query.

So I need advice for configuration of kafka, the 'feed' topic, the store
topics, and the stores themselves.

Thanks in advance!

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Damian Guy <da...@gmail.com>.
If you use the versions of the methods that pass in the store name they
will all be backed by RocksDB

On Wed, 3 May 2017 at 15:32 Garrett Barton <ga...@gmail.com> wrote:

> João, yes the stores would hold 90 days and prefer it to be rocksdb backed.
>
> I didn't actually know there wasn't an in memory state store.  And now that
> I think about it, how do I verify (or set) what kind of store streams is
> using for all the tasks? I have a bunch windowed and not windowed and I do
> have memory issues, wondering if the non-windowed ones are defaulting to in
> memory vs rocksdb?  I use the named versions of reduce/agg/count/etc right
> now, would I have to build a state store and pass that in?
>
> Thanks all!
>
> On Wed, May 3, 2017 at 9:25 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Just to add to this, there is a JIRA that tracks the fact that we don’t
> > have an in-memory windowed store. https://issues.apache.org/
> > jira/browse/KAFKA-4730 <https://issues.apache.org/jira/browse/KAFKA-4730
> >
> >
> > Eno
> > > On May 3, 2017, at 12:42 PM, Damian Guy <da...@gmail.com> wrote:
> > >
> > > The windowed state store is only RocksDB at this point, so it isn't
> going
> > > to all be in memory. If you chose to implement your own Windowed Store,
> > > then you could hold it in memory if it would fit.
> > >
> > > On Wed, 3 May 2017 at 04:37 João Peixoto <jo...@gmail.com>
> > wrote:
> > >
> > >> Out of curiosity, would this mean that a state store for such a window
> > >> could hold 90 days worth of data in memory?
> > >>
> > >> Or filesystem if we're talking about Rocksdb
> > >> On Tue, May 2, 2017 at 10:08 AM Damian Guy <da...@gmail.com>
> > wrote:
> > >>
> > >>> Hi Garret,
> > >>>
> > >>> No, log.retention.hours doesn't impact compacted topics.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Tue, 2 May 2017 at 18:06 Garrett Barton <garrett.barton@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> Thanks Damian,
> > >>>>
> > >>>> Does setting log.retention.hours have anything to do with compacted
> > >>>> topics?  Meaning would a topic not compact now for 90 days? I am
> > >> thinking
> > >>>> all the internal topics that streams creates in the flow.  Having
> > >>> recovery
> > >>>> through 90 days of logs would take a good while I'd imagine.
> > >>>>
> > >>>> Thanks for clarifying that the until() does in fact set properties
> > >>> against
> > >>>> the internal topics created.  That makes sense.
> > >>>>
> > >>>> On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>>> Hi Garret,
> > >>>>>
> > >>>>>
> > >>>>>> I was running into data loss when segments are deleted faster than
> > >>>>>> downstream can process.  My knee jerk reaction was to set the
> > >> broker
> > >>>>>> configs log.retention.hours=2160 and log.segment.delete.delay.ms=
> > >>>>> 21600000
> > >>>>>> and that made it go away, but I do not think this is right?
> > >>>>>>
> > >>>>>>
> > >>>>> I think setting log.retention.hours to 2160 is correct (not sure
> > >> about
> > >>>>> log.segment.delete.delay.ms) as segment retention is based on the
> > >>> record
> > >>>>> timestamps. So if you have 90 day old data you want to process then
> > >> you
> > >>>>> should set it to at least 90 days.
> > >>>>>
> > >>>>>
> > >>>>>> For examples sake, assume a source topic 'feed', assume a stream
> to
> > >>>>>> calculate min/max/avg to start with, using windows of 1 minute and
> > >> 5
> > >>>>>> minutes.  I wish to use the interactive queries against the window
> > >>>>> stores,
> > >>>>>> and I wish to retain 90 days of window data to query.
> > >>>>>>
> > >>>>> So I need advice for configuration of kafka, the 'feed' topic, the
> > >>> store
> > >>>>>> topics, and the stores themselves.
> > >>>>>>
> > >>>>>>
> > >>>>> When you create the Windows as part of the streams app you should
> > >>> specify
> > >>>>> them something like so: TimeWindows.of(1minute).until(90days) - in
> > >> this
> > >>>>> way
> > >>>>> the stores and underling changelog topics will be configured with
> the
> > >>>>> correct retention periods.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Damian
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Garrett Barton <ga...@gmail.com>.
João, yes the stores would hold 90 days and prefer it to be rocksdb backed.

I didn't actually know there wasn't an in memory state store.  And now that
I think about it, how do I verify (or set) what kind of store streams is
using for all the tasks? I have a bunch windowed and not windowed and I do
have memory issues, wondering if the non-windowed ones are defaulting to in
memory vs rocksdb?  I use the named versions of reduce/agg/count/etc right
now, would I have to build a state store and pass that in?

Thanks all!

On Wed, May 3, 2017 at 9:25 AM, Eno Thereska <en...@gmail.com> wrote:

> Just to add to this, there is a JIRA that tracks the fact that we don’t
> have an in-memory windowed store. https://issues.apache.org/
> jira/browse/KAFKA-4730 <https://issues.apache.org/jira/browse/KAFKA-4730>
>
> Eno
> > On May 3, 2017, at 12:42 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > The windowed state store is only RocksDB at this point, so it isn't going
> > to all be in memory. If you chose to implement your own Windowed Store,
> > then you could hold it in memory if it would fit.
> >
> > On Wed, 3 May 2017 at 04:37 João Peixoto <jo...@gmail.com>
> wrote:
> >
> >> Out of curiosity, would this mean that a state store for such a window
> >> could hold 90 days worth of data in memory?
> >>
> >> Or filesystem if we're talking about Rocksdb
> >> On Tue, May 2, 2017 at 10:08 AM Damian Guy <da...@gmail.com>
> wrote:
> >>
> >>> Hi Garret,
> >>>
> >>> No, log.retention.hours doesn't impact compacted topics.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Tue, 2 May 2017 at 18:06 Garrett Barton <ga...@gmail.com>
> >>> wrote:
> >>>
> >>>> Thanks Damian,
> >>>>
> >>>> Does setting log.retention.hours have anything to do with compacted
> >>>> topics?  Meaning would a topic not compact now for 90 days? I am
> >> thinking
> >>>> all the internal topics that streams creates in the flow.  Having
> >>> recovery
> >>>> through 90 days of logs would take a good while I'd imagine.
> >>>>
> >>>> Thanks for clarifying that the until() does in fact set properties
> >>> against
> >>>> the internal topics created.  That makes sense.
> >>>>
> >>>> On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi Garret,
> >>>>>
> >>>>>
> >>>>>> I was running into data loss when segments are deleted faster than
> >>>>>> downstream can process.  My knee jerk reaction was to set the
> >> broker
> >>>>>> configs log.retention.hours=2160 and log.segment.delete.delay.ms=
> >>>>> 21600000
> >>>>>> and that made it go away, but I do not think this is right?
> >>>>>>
> >>>>>>
> >>>>> I think setting log.retention.hours to 2160 is correct (not sure
> >> about
> >>>>> log.segment.delete.delay.ms) as segment retention is based on the
> >>> record
> >>>>> timestamps. So if you have 90 day old data you want to process then
> >> you
> >>>>> should set it to at least 90 days.
> >>>>>
> >>>>>
> >>>>>> For examples sake, assume a source topic 'feed', assume a stream to
> >>>>>> calculate min/max/avg to start with, using windows of 1 minute and
> >> 5
> >>>>>> minutes.  I wish to use the interactive queries against the window
> >>>>> stores,
> >>>>>> and I wish to retain 90 days of window data to query.
> >>>>>>
> >>>>> So I need advice for configuration of kafka, the 'feed' topic, the
> >>> store
> >>>>>> topics, and the stores themselves.
> >>>>>>
> >>>>>>
> >>>>> When you create the Windows as part of the streams app you should
> >>> specify
> >>>>> them something like so: TimeWindows.of(1minute).until(90days) - in
> >> this
> >>>>> way
> >>>>> the stores and underling changelog topics will be configured with the
> >>>>> correct retention periods.
> >>>>>
> >>>>> Thanks,
> >>>>> Damian
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Eno Thereska <en...@gmail.com>.
Just to add to this, there is a JIRA that tracks the fact that we don’t have an in-memory windowed store. https://issues.apache.org/jira/browse/KAFKA-4730 <https://issues.apache.org/jira/browse/KAFKA-4730>

Eno
> On May 3, 2017, at 12:42 PM, Damian Guy <da...@gmail.com> wrote:
> 
> The windowed state store is only RocksDB at this point, so it isn't going
> to all be in memory. If you chose to implement your own Windowed Store,
> then you could hold it in memory if it would fit.
> 
> On Wed, 3 May 2017 at 04:37 João Peixoto <jo...@gmail.com> wrote:
> 
>> Out of curiosity, would this mean that a state store for such a window
>> could hold 90 days worth of data in memory?
>> 
>> Or filesystem if we're talking about Rocksdb
>> On Tue, May 2, 2017 at 10:08 AM Damian Guy <da...@gmail.com> wrote:
>> 
>>> Hi Garret,
>>> 
>>> No, log.retention.hours doesn't impact compacted topics.
>>> 
>>> Thanks,
>>> Damian
>>> 
>>> On Tue, 2 May 2017 at 18:06 Garrett Barton <ga...@gmail.com>
>>> wrote:
>>> 
>>>> Thanks Damian,
>>>> 
>>>> Does setting log.retention.hours have anything to do with compacted
>>>> topics?  Meaning would a topic not compact now for 90 days? I am
>> thinking
>>>> all the internal topics that streams creates in the flow.  Having
>>> recovery
>>>> through 90 days of logs would take a good while I'd imagine.
>>>> 
>>>> Thanks for clarifying that the until() does in fact set properties
>>> against
>>>> the internal topics created.  That makes sense.
>>>> 
>>>> On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com>
>>> wrote:
>>>> 
>>>>> Hi Garret,
>>>>> 
>>>>> 
>>>>>> I was running into data loss when segments are deleted faster than
>>>>>> downstream can process.  My knee jerk reaction was to set the
>> broker
>>>>>> configs log.retention.hours=2160 and log.segment.delete.delay.ms=
>>>>> 21600000
>>>>>> and that made it go away, but I do not think this is right?
>>>>>> 
>>>>>> 
>>>>> I think setting log.retention.hours to 2160 is correct (not sure
>> about
>>>>> log.segment.delete.delay.ms) as segment retention is based on the
>>> record
>>>>> timestamps. So if you have 90 day old data you want to process then
>> you
>>>>> should set it to at least 90 days.
>>>>> 
>>>>> 
>>>>>> For examples sake, assume a source topic 'feed', assume a stream to
>>>>>> calculate min/max/avg to start with, using windows of 1 minute and
>> 5
>>>>>> minutes.  I wish to use the interactive queries against the window
>>>>> stores,
>>>>>> and I wish to retain 90 days of window data to query.
>>>>>> 
>>>>> So I need advice for configuration of kafka, the 'feed' topic, the
>>> store
>>>>>> topics, and the stores themselves.
>>>>>> 
>>>>>> 
>>>>> When you create the Windows as part of the streams app you should
>>> specify
>>>>> them something like so: TimeWindows.of(1minute).until(90days) - in
>> this
>>>>> way
>>>>> the stores and underling changelog topics will be configured with the
>>>>> correct retention periods.
>>>>> 
>>>>> Thanks,
>>>>> Damian
>>>>> 
>>>> 
>>> 
>> 


Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Damian Guy <da...@gmail.com>.
The windowed state store is only RocksDB at this point, so it isn't going
to all be in memory. If you chose to implement your own Windowed Store,
then you could hold it in memory if it would fit.

On Wed, 3 May 2017 at 04:37 João Peixoto <jo...@gmail.com> wrote:

> Out of curiosity, would this mean that a state store for such a window
> could hold 90 days worth of data in memory?
>
> Or filesystem if we're talking about Rocksdb
> On Tue, May 2, 2017 at 10:08 AM Damian Guy <da...@gmail.com> wrote:
>
> > Hi Garret,
> >
> > No, log.retention.hours doesn't impact compacted topics.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 2 May 2017 at 18:06 Garrett Barton <ga...@gmail.com>
> > wrote:
> >
> > > Thanks Damian,
> > >
> > > Does setting log.retention.hours have anything to do with compacted
> > > topics?  Meaning would a topic not compact now for 90 days? I am
> thinking
> > > all the internal topics that streams creates in the flow.  Having
> > recovery
> > > through 90 days of logs would take a good while I'd imagine.
> > >
> > > Thanks for clarifying that the until() does in fact set properties
> > against
> > > the internal topics created.  That makes sense.
> > >
> > > On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > > > Hi Garret,
> > > >
> > > >
> > > > > I was running into data loss when segments are deleted faster than
> > > > > downstream can process.  My knee jerk reaction was to set the
> broker
> > > > > configs log.retention.hours=2160 and log.segment.delete.delay.ms=
> > > > 21600000
> > > > > and that made it go away, but I do not think this is right?
> > > > >
> > > > >
> > > > I think setting log.retention.hours to 2160 is correct (not sure
> about
> > > > log.segment.delete.delay.ms) as segment retention is based on the
> > record
> > > > timestamps. So if you have 90 day old data you want to process then
> you
> > > > should set it to at least 90 days.
> > > >
> > > >
> > > > > For examples sake, assume a source topic 'feed', assume a stream to
> > > > > calculate min/max/avg to start with, using windows of 1 minute and
> 5
> > > > > minutes.  I wish to use the interactive queries against the window
> > > > stores,
> > > > > and I wish to retain 90 days of window data to query.
> > > > >
> > > > So I need advice for configuration of kafka, the 'feed' topic, the
> > store
> > > > > topics, and the stores themselves.
> > > > >
> > > > >
> > > > When you create the Windows as part of the streams app you should
> > specify
> > > > them something like so: TimeWindows.of(1minute).until(90days) - in
> this
> > > > way
> > > > the stores and underling changelog topics will be configured with the
> > > > correct retention periods.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > >
> >
>

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by João Peixoto <jo...@gmail.com>.
Out of curiosity, would this mean that a state store for such a window
could hold 90 days worth of data in memory?

Or filesystem if we're talking about Rocksdb
On Tue, May 2, 2017 at 10:08 AM Damian Guy <da...@gmail.com> wrote:

> Hi Garret,
>
> No, log.retention.hours doesn't impact compacted topics.
>
> Thanks,
> Damian
>
> On Tue, 2 May 2017 at 18:06 Garrett Barton <ga...@gmail.com>
> wrote:
>
> > Thanks Damian,
> >
> > Does setting log.retention.hours have anything to do with compacted
> > topics?  Meaning would a topic not compact now for 90 days? I am thinking
> > all the internal topics that streams creates in the flow.  Having
> recovery
> > through 90 days of logs would take a good while I'd imagine.
> >
> > Thanks for clarifying that the until() does in fact set properties
> against
> > the internal topics created.  That makes sense.
> >
> > On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi Garret,
> > >
> > >
> > > > I was running into data loss when segments are deleted faster than
> > > > downstream can process.  My knee jerk reaction was to set the broker
> > > > configs log.retention.hours=2160 and log.segment.delete.delay.ms=
> > > 21600000
> > > > and that made it go away, but I do not think this is right?
> > > >
> > > >
> > > I think setting log.retention.hours to 2160 is correct (not sure about
> > > log.segment.delete.delay.ms) as segment retention is based on the
> record
> > > timestamps. So if you have 90 day old data you want to process then you
> > > should set it to at least 90 days.
> > >
> > >
> > > > For examples sake, assume a source topic 'feed', assume a stream to
> > > > calculate min/max/avg to start with, using windows of 1 minute and 5
> > > > minutes.  I wish to use the interactive queries against the window
> > > stores,
> > > > and I wish to retain 90 days of window data to query.
> > > >
> > > So I need advice for configuration of kafka, the 'feed' topic, the
> store
> > > > topics, and the stores themselves.
> > > >
> > > >
> > > When you create the Windows as part of the streams app you should
> specify
> > > them something like so: TimeWindows.of(1minute).until(90days) - in this
> > > way
> > > the stores and underling changelog topics will be configured with the
> > > correct retention periods.
> > >
> > > Thanks,
> > > Damian
> > >
> >
>

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Damian Guy <da...@gmail.com>.
Hi Garret,

No, log.retention.hours doesn't impact compacted topics.

Thanks,
Damian

On Tue, 2 May 2017 at 18:06 Garrett Barton <ga...@gmail.com> wrote:

> Thanks Damian,
>
> Does setting log.retention.hours have anything to do with compacted
> topics?  Meaning would a topic not compact now for 90 days? I am thinking
> all the internal topics that streams creates in the flow.  Having recovery
> through 90 days of logs would take a good while I'd imagine.
>
> Thanks for clarifying that the until() does in fact set properties against
> the internal topics created.  That makes sense.
>
> On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Garret,
> >
> >
> > > I was running into data loss when segments are deleted faster than
> > > downstream can process.  My knee jerk reaction was to set the broker
> > > configs log.retention.hours=2160 and log.segment.delete.delay.ms=
> > 21600000
> > > and that made it go away, but I do not think this is right?
> > >
> > >
> > I think setting log.retention.hours to 2160 is correct (not sure about
> > log.segment.delete.delay.ms) as segment retention is based on the record
> > timestamps. So if you have 90 day old data you want to process then you
> > should set it to at least 90 days.
> >
> >
> > > For examples sake, assume a source topic 'feed', assume a stream to
> > > calculate min/max/avg to start with, using windows of 1 minute and 5
> > > minutes.  I wish to use the interactive queries against the window
> > stores,
> > > and I wish to retain 90 days of window data to query.
> > >
> > So I need advice for configuration of kafka, the 'feed' topic, the store
> > > topics, and the stores themselves.
> > >
> > >
> > When you create the Windows as part of the streams app you should specify
> > them something like so: TimeWindows.of(1minute).until(90days) - in this
> > way
> > the stores and underling changelog topics will be configured with the
> > correct retention periods.
> >
> > Thanks,
> > Damian
> >
>

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Garrett Barton <ga...@gmail.com>.
Thanks Damian,

Does setting log.retention.hours have anything to do with compacted
topics?  Meaning would a topic not compact now for 90 days? I am thinking
all the internal topics that streams creates in the flow.  Having recovery
through 90 days of logs would take a good while I'd imagine.

Thanks for clarifying that the until() does in fact set properties against
the internal topics created.  That makes sense.

On Tue, May 2, 2017 at 11:44 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Garret,
>
>
> > I was running into data loss when segments are deleted faster than
> > downstream can process.  My knee jerk reaction was to set the broker
> > configs log.retention.hours=2160 and log.segment.delete.delay.ms=
> 21600000
> > and that made it go away, but I do not think this is right?
> >
> >
> I think setting log.retention.hours to 2160 is correct (not sure about
> log.segment.delete.delay.ms) as segment retention is based on the record
> timestamps. So if you have 90 day old data you want to process then you
> should set it to at least 90 days.
>
>
> > For examples sake, assume a source topic 'feed', assume a stream to
> > calculate min/max/avg to start with, using windows of 1 minute and 5
> > minutes.  I wish to use the interactive queries against the window
> stores,
> > and I wish to retain 90 days of window data to query.
> >
> So I need advice for configuration of kafka, the 'feed' topic, the store
> > topics, and the stores themselves.
> >
> >
> When you create the Windows as part of the streams app you should specify
> them something like so: TimeWindows.of(1minute).until(90days) - in this
> way
> the stores and underling changelog topics will be configured with the
> correct retention periods.
>
> Thanks,
> Damian
>

Re: Setting up Kafka & Kafka Streams for loading real-time and 'older' data concurrently

Posted by Damian Guy <da...@gmail.com>.
Hi Garret,


> I was running into data loss when segments are deleted faster than
> downstream can process.  My knee jerk reaction was to set the broker
> configs log.retention.hours=2160 and log.segment.delete.delay.ms=21600000
> and that made it go away, but I do not think this is right?
>
>
I think setting log.retention.hours to 2160 is correct (not sure about
log.segment.delete.delay.ms) as segment retention is based on the record
timestamps. So if you have 90 day old data you want to process then you
should set it to at least 90 days.


> For examples sake, assume a source topic 'feed', assume a stream to
> calculate min/max/avg to start with, using windows of 1 minute and 5
> minutes.  I wish to use the interactive queries against the window stores,
> and I wish to retain 90 days of window data to query.
>
So I need advice for configuration of kafka, the 'feed' topic, the store
> topics, and the stores themselves.
>
>
When you create the Windows as part of the streams app you should specify
them something like so: TimeWindows.of(1minute).until(90days) - in this way
the stores and underling changelog topics will be configured with the
correct retention periods.

Thanks,
Damian