You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Bae, Jae Hyeon" <me...@gmail.com> on 2015/10/02 22:43:27 UTC

How to synchronize KeyValueStore and Kafka cleanup

Hi Samza devs and users

This is my first try with KeyValueStore and I am really excited!

I glanced through TaskStorageManager source code, it looks creates
consumers for stores and I am wondering that how kafka cleanup will be
propagated to KeyValueStore.

My KeyValueStore usage is a little bit different from usual cases because
 I have to cache all unique ids for the past six hours, which can be
configured for the retention usage. Unique ids won't be repeated such as
timestamp. In this case, log.cleanup.policy=compact will keep growing the
KeyValueStore size, right?

Can I use Samza KeyValueStore for the topics
with log.cleanup.policy=delete? If not, what's your recommended way for
state management of non-changelog Kafka topic? If it's possible, how does
Kafka cleanup remove outdated records in KeyValueStore?

Thank you
Best, Jae

Re: How to synchronize KeyValueStore and Kafka cleanup

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
Hi Chinmay

> Why wouldn't you want to use a changelog ?

Because log compaction won't work here because I want to cache unique ids
which will not be repeated like timestamp. But for restoration, I have to
use changelog. Also, my StreamTask should consume that topic to add new ids
unique ids to KV store and delete outdated ones.

My concern is, if my StreamTask consume the topic and call put() method as
the following:

if
(envelope.getSystemStreamPartition().getStream().equals("changelog_topic"))
{
            store.put((String) envelope.getKey(), (String)
envelope.getMessage());
} else {
...
}

store will send messages to Kafka for the changelog but it will make just a
duplicate.

So, should I create another topic which is different from the source for
the change log? For example, if unique ids is coming from topicA, do I need
to create something like changelog_topicA?

On Fri, Oct 2, 2015 at 5:11 PM, Chinmay Soman <ch...@gmail.com>
wrote:

> > Does KV-store consume automatically from a Kafka topic?
> Yes - if you've configured changelog stream for your store
>
> >  Does it consume only on restore()?
> It consumes only during container initialization (again, assuming if you
> have changelog configured)
>
> > implement the StreamTask job to consume a Kafka topic and call add()
> method?
> Why wouldn't you want to use a changelog ?
>
>
> On Fri, Oct 2, 2015 at 3:09 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:
>
> > Thanks Yi Pan, I have one more question.
> >
> > Does KV-store consume automatically from a Kafka topic? Does it consume
> > only on restore()? If so, do I have to implement the StreamTask job to
> > consume a Kafka topic and call add() method?
> >
> > On Fri, Oct 2, 2015 at 2:01 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> > > Hi, Jae Hyeon,
> > >
> > > Good to see you back on the mailing list again! Regarding to your
> > > questions, please see the answers below:
> > >
> > > > My KeyValueStore usage is a little bit different from usual cases
> > because
> > > > >  I have to cache all unique ids for the past six hours, which can
> be
> > > > > configured for the retention usage. Unique ids won't be repeated
> such
> > > as
> > > > > timestamp. In this case, log.cleanup.policy=compact will keep
> growing
> > > the
> > > > > KeyValueStore size, right?
> > > >
> > >
> > > It will grow as big as the accumulative size of your unique ids.
> > >
> > >
> > > > >
> > > > > Can I use Samza KeyValueStore for the topics
> > > > > with log.cleanup.policy=delete? If not, what's your recommended way
> > for
> > > > > state management of non-changelog Kafka topic? If it's possible,
> how
> > > does
> > > > > Kafka cleanup remove outdated records in KeyValueStore?
> > > >
> > >
> > > I am not quite sure about your definition of "non-changelog" Kafka
> > topics.
> > > If you want to retire some of the old records in a KV-store
> periodically,
> > > you will have to run the pruning manually in the window() method in the
> > > current release. In the upcoming 0.10 release, we have incorporated
> > RocksDB
> > > TTL features in the KV-store, which would automatically prune the old
> > > entries in the RocksDB store automatically. That said, the upcoming TTL
> > > feature is not fully synchronized w/ the Kafka cleanup yet and is an
> > > on-going work in the future. The recommendation is to use the TTL
> feature
> > > and set the Kafka changelog to be time-retention based, w/ a retention
> > time
> > > longer than the RocksDB TTL to ensure no data loss.
> > >
> > > Hope the above answered your questions.
> > >
> > > Cheers!
> > >
> > > -Yi
> > >
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>

Re: How to synchronize KeyValueStore and Kafka cleanup

Posted by Chinmay Soman <ch...@gmail.com>.
> Does KV-store consume automatically from a Kafka topic?
Yes - if you've configured changelog stream for your store

>  Does it consume only on restore()?
It consumes only during container initialization (again, assuming if you
have changelog configured)

> implement the StreamTask job to consume a Kafka topic and call add()
method?
Why wouldn't you want to use a changelog ?


On Fri, Oct 2, 2015 at 3:09 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Thanks Yi Pan, I have one more question.
>
> Does KV-store consume automatically from a Kafka topic? Does it consume
> only on restore()? If so, do I have to implement the StreamTask job to
> consume a Kafka topic and call add() method?
>
> On Fri, Oct 2, 2015 at 2:01 PM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, Jae Hyeon,
> >
> > Good to see you back on the mailing list again! Regarding to your
> > questions, please see the answers below:
> >
> > > My KeyValueStore usage is a little bit different from usual cases
> because
> > > >  I have to cache all unique ids for the past six hours, which can be
> > > > configured for the retention usage. Unique ids won't be repeated such
> > as
> > > > timestamp. In this case, log.cleanup.policy=compact will keep growing
> > the
> > > > KeyValueStore size, right?
> > >
> >
> > It will grow as big as the accumulative size of your unique ids.
> >
> >
> > > >
> > > > Can I use Samza KeyValueStore for the topics
> > > > with log.cleanup.policy=delete? If not, what's your recommended way
> for
> > > > state management of non-changelog Kafka topic? If it's possible, how
> > does
> > > > Kafka cleanup remove outdated records in KeyValueStore?
> > >
> >
> > I am not quite sure about your definition of "non-changelog" Kafka
> topics.
> > If you want to retire some of the old records in a KV-store periodically,
> > you will have to run the pruning manually in the window() method in the
> > current release. In the upcoming 0.10 release, we have incorporated
> RocksDB
> > TTL features in the KV-store, which would automatically prune the old
> > entries in the RocksDB store automatically. That said, the upcoming TTL
> > feature is not fully synchronized w/ the Kafka cleanup yet and is an
> > on-going work in the future. The recommendation is to use the TTL feature
> > and set the Kafka changelog to be time-retention based, w/ a retention
> time
> > longer than the RocksDB TTL to ensure no data loss.
> >
> > Hope the above answered your questions.
> >
> > Cheers!
> >
> > -Yi
> >
>



-- 
Thanks and regards

Chinmay Soman

Re: How to synchronize KeyValueStore and Kafka cleanup

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
Thanks Yi Pan, I have one more question.

Does KV-store consume automatically from a Kafka topic? Does it consume
only on restore()? If so, do I have to implement the StreamTask job to
consume a Kafka topic and call add() method?

On Fri, Oct 2, 2015 at 2:01 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Jae Hyeon,
>
> Good to see you back on the mailing list again! Regarding to your
> questions, please see the answers below:
>
> > My KeyValueStore usage is a little bit different from usual cases because
> > >  I have to cache all unique ids for the past six hours, which can be
> > > configured for the retention usage. Unique ids won't be repeated such
> as
> > > timestamp. In this case, log.cleanup.policy=compact will keep growing
> the
> > > KeyValueStore size, right?
> >
>
> It will grow as big as the accumulative size of your unique ids.
>
>
> > >
> > > Can I use Samza KeyValueStore for the topics
> > > with log.cleanup.policy=delete? If not, what's your recommended way for
> > > state management of non-changelog Kafka topic? If it's possible, how
> does
> > > Kafka cleanup remove outdated records in KeyValueStore?
> >
>
> I am not quite sure about your definition of "non-changelog" Kafka topics.
> If you want to retire some of the old records in a KV-store periodically,
> you will have to run the pruning manually in the window() method in the
> current release. In the upcoming 0.10 release, we have incorporated RocksDB
> TTL features in the KV-store, which would automatically prune the old
> entries in the RocksDB store automatically. That said, the upcoming TTL
> feature is not fully synchronized w/ the Kafka cleanup yet and is an
> on-going work in the future. The recommendation is to use the TTL feature
> and set the Kafka changelog to be time-retention based, w/ a retention time
> longer than the RocksDB TTL to ensure no data loss.
>
> Hope the above answered your questions.
>
> Cheers!
>
> -Yi
>

Re: How to synchronize KeyValueStore and Kafka cleanup

Posted by Yi Pan <ni...@gmail.com>.
Hi, Jae Hyeon,

Good to see you back on the mailing list again! Regarding to your
questions, please see the answers below:

> My KeyValueStore usage is a little bit different from usual cases because
> >  I have to cache all unique ids for the past six hours, which can be
> > configured for the retention usage. Unique ids won't be repeated such as
> > timestamp. In this case, log.cleanup.policy=compact will keep growing the
> > KeyValueStore size, right?
>

It will grow as big as the accumulative size of your unique ids.


> >
> > Can I use Samza KeyValueStore for the topics
> > with log.cleanup.policy=delete? If not, what's your recommended way for
> > state management of non-changelog Kafka topic? If it's possible, how does
> > Kafka cleanup remove outdated records in KeyValueStore?
>

I am not quite sure about your definition of "non-changelog" Kafka topics.
If you want to retire some of the old records in a KV-store periodically,
you will have to run the pruning manually in the window() method in the
current release. In the upcoming 0.10 release, we have incorporated RocksDB
TTL features in the KV-store, which would automatically prune the old
entries in the RocksDB store automatically. That said, the upcoming TTL
feature is not fully synchronized w/ the Kafka cleanup yet and is an
on-going work in the future. The recommendation is to use the TTL feature
and set the Kafka changelog to be time-retention based, w/ a retention time
longer than the RocksDB TTL to ensure no data loss.

Hope the above answered your questions.

Cheers!

-Yi

Re: How to synchronize KeyValueStore and Kafka cleanup

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
I found the following statement from Samza documentation:

"Periodically the job scans over both stores and deletes any old events
that were not matched within the time window of the join."

It seems that I have to manually implement purging KeyValueStore, did I
understand correctly?

On Fri, Oct 2, 2015 at 1:43 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Hi Samza devs and users
>
> This is my first try with KeyValueStore and I am really excited!
>
> I glanced through TaskStorageManager source code, it looks creates
> consumers for stores and I am wondering that how kafka cleanup will be
> propagated to KeyValueStore.
>
> My KeyValueStore usage is a little bit different from usual cases because
>  I have to cache all unique ids for the past six hours, which can be
> configured for the retention usage. Unique ids won't be repeated such as
> timestamp. In this case, log.cleanup.policy=compact will keep growing the
> KeyValueStore size, right?
>
> Can I use Samza KeyValueStore for the topics
> with log.cleanup.policy=delete? If not, what's your recommended way for
> state management of non-changelog Kafka topic? If it's possible, how does
> Kafka cleanup remove outdated records in KeyValueStore?
>
> Thank you
> Best, Jae
>
>
>