You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Chen Song <ch...@gmail.com> on 2015/07/22 18:00:27 UTC

question on commit on changelog

We are trying to understand the order of commits when processing each
message in a Samza job.

T1: input offset commit
T2: changelog commit
T3: output commit

By looking at the code snippet in
https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171,
my understanding is that for each input message, Samza always send update
message on changelog, send the output message and then commit the input
offset. It makes sense to me at the high level in terms of at least once
processing.

Specifically, we have two dumb questions:

1. When implementing our Samza task, does each call of process method
triggers a call to TaskInstance.commit?
2. Is there a way to buffer these commit activities in memory and flush
periodically? Our job is joining >1mm messages per second using a KV store
and we have a lot of concern for the changelog size, as in the worst case,
the change log will grow as fast as the input log.

Chen

-- 
Chen Song

Re: question on commit on changelog

Posted by Yi Pan <ni...@gmail.com>.
Hi, Chen,

The at-least-once semantics is always guaranteed by committing the offsets
at the last step in the commit. Hence, flushing to local disk, changelog
and output topics always need to succeed before the offsets are committed
to checkpoint. If anything fails in-between, the offset will not be
committed and the replay will happen, as per definition of at-least-once
semantics.

Best,

-Yi

On Tue, Aug 25, 2015 at 9:04 AM, Chen Song <ch...@gmail.com> wrote:

> Thanks Yi.
>
> Another follow up question. If 'checkpoint' and 'changelog' is managed
> separately (according to your explanation above), how do we support at
> least once processing?
>
> For example, what if the task commits offsets to checkpoint topic but the
> producer doesn't send all data in its buffer to the changelog topic and the
> task crashed?
>
> Chen
>
> On Tue, Aug 4, 2015 at 12:54 PM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, Chen,
> >
> > So, is your goal to improve the throughput to the changelog topic or
> reduce
> > the size of the changelog topic? If you are targeting for later and your
> > KV-store truly is of the size of the input log, I don't see how it is
> > possible. In a lot of use cases, users will only need to retain the
> > *recent* certain time period of input log. In that case, you can choose
> to
> > periodically purge the expired records in KV-store to reduce the size
> (both
> > for the KV-store and the changelog).
> >
> > Regards,
> > -Yi
> >
> > On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <ch...@gmail.com>
> wrote:
> >
> > > Thanks Yan.
> > >
> > > Very good explanation on 1).
> > >
> > > For 2), I understand that users can tune the size of the batch for
> Kafka
> > > producer. However, that doesn't change the number of messages sent to
> the
> > > changelog topic. In our case, we process a high volume log  (1.5MM
> > > records/second) will update kv store for each message and this will
> > result
> > > the changelog to grow to the same size of input log. Even with
> compaction
> > > turned on changelog, it is not very scalable. I am wondering if there
> is
> > a
> > > way to mitigate this problem.
> > >
> > >
> > > On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <ya...@gmail.com>
> wrote:
> > >
> > > > Hi Chen Song,
> > > >
> > > > There are two different concepts: *checkpoint* and *changelog*.
> > > Checkpoint
> > > > is for the offset of the messages, while the changelog is for the
> > > kv-store.
> > > > The code snippet you show is for the checkpoint , not for the
> > changelog.
> > > >
> > > > {quote}
> > > > 1. When implementing our Samza task, does each call of process method
> > > > triggers a call to TaskInstance.commit?
> > > > {quote}
> > > >
> > > > TaskInstance.commit triggers the *checkpoint* . It is triggered every
> > > > task.commit.ms , (default is 60000ms). The code is here
> > > > <
> > > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> > > > >
> > > > . Basically, the RunLoop class calls the commit method, but only
> > trigger
> > > > the commit behavior every configured time.
> > > >
> > > > If you are talking about the *changelog*, it's not controlled by the
> > > commit
> > > > method. Instead, every put/delete calls the "send
> > > > <
> > > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> > > > >"
> > > > of the system Producer. (code is here
> > > > <
> > > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> > > > >).
> > > > In terms of how often the "send" really *send *to the broker (e.g.
> > > kafka),
> > > > it depends on your producer's configuration. For example, in Kafka,
> you
> > > can
> > > > have the producer send a batch (setting async), or send one msg a
> time
> > > > (setting sync). What it means is that, it leaves the System to decide
> > how
> > > > to deal with the "send" method.
> > > >
> > > >
> > > > {quote}
> > > > 2. Is there a way to buffer these commit activities in memory and
> flush
> > > > periodically? Our job is joining >1mm messages per second using a KV
> > > store
> > > > and we have a lot of concern for the changelog size, as in the worst
> > > case,
> > > > the change log will grow as fast as the input log.
> > > > {quote}
> > > >
> > > > If you are talking about the checkpoint, you can change the
> > > task.commit.ms
> > > > .
> > > >
> > > > If you are thinking of the changelog (kv-store), you can change the
> > > > producer's config to batch a few changes and send to the broker.
> > > >
> > > > I think the guys in the community with more operational experience
> are
> > > able
> > > > to tell you what is the best practice.
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang724@gmail.com
> > > >
> > > > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <ch...@gmail.com>
> > > wrote:
> > > >
> > > > > We are trying to understand the order of commits when processing
> each
> > > > > message in a Samza job.
> > > > >
> > > > > T1: input offset commit
> > > > > T2: changelog commit
> > > > > T3: output commit
> > > > >
> > > > > By looking at the code snippet in
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > > > > ,
> > > > > my understanding is that for each input message, Samza always send
> > > update
> > > > > message on changelog, send the output message and then commit the
> > input
> > > > > offset. It makes sense to me at the high level in terms of at least
> > > once
> > > > > processing.
> > > > >
> > > > > Specifically, we have two dumb questions:
> > > > >
> > > > > 1. When implementing our Samza task, does each call of process
> method
> > > > > triggers a call to TaskInstance.commit?
> > > > > 2. Is there a way to buffer these commit activities in memory and
> > flush
> > > > > periodically? Our job is joining >1mm messages per second using a
> KV
> > > > store
> > > > > and we have a lot of concern for the changelog size, as in the
> worst
> > > > case,
> > > > > the change log will grow as fast as the input log.
> > > > >
> > > > > Chen
> > > > >
> > > > > --
> > > > > Chen Song
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Chen Song
> > >
> >
>
>
>
> --
> Chen Song
>

Re: question on commit on changelog

Posted by Chen Song <ch...@gmail.com>.
Thanks Yi.

Another follow up question. If 'checkpoint' and 'changelog' is managed
separately (according to your explanation above), how do we support at
least once processing?

For example, what if the task commits offsets to checkpoint topic but the
producer doesn't send all data in its buffer to the changelog topic and the
task crashed?

Chen

On Tue, Aug 4, 2015 at 12:54 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Chen,
>
> So, is your goal to improve the throughput to the changelog topic or reduce
> the size of the changelog topic? If you are targeting for later and your
> KV-store truly is of the size of the input log, I don't see how it is
> possible. In a lot of use cases, users will only need to retain the
> *recent* certain time period of input log. In that case, you can choose to
> periodically purge the expired records in KV-store to reduce the size (both
> for the KV-store and the changelog).
>
> Regards,
> -Yi
>
> On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <ch...@gmail.com> wrote:
>
> > Thanks Yan.
> >
> > Very good explanation on 1).
> >
> > For 2), I understand that users can tune the size of the batch for Kafka
> > producer. However, that doesn't change the number of messages sent to the
> > changelog topic. In our case, we process a high volume log  (1.5MM
> > records/second) will update kv store for each message and this will
> result
> > the changelog to grow to the same size of input log. Even with compaction
> > turned on changelog, it is not very scalable. I am wondering if there is
> a
> > way to mitigate this problem.
> >
> >
> > On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <ya...@gmail.com> wrote:
> >
> > > Hi Chen Song,
> > >
> > > There are two different concepts: *checkpoint* and *changelog*.
> > Checkpoint
> > > is for the offset of the messages, while the changelog is for the
> > kv-store.
> > > The code snippet you show is for the checkpoint , not for the
> changelog.
> > >
> > > {quote}
> > > 1. When implementing our Samza task, does each call of process method
> > > triggers a call to TaskInstance.commit?
> > > {quote}
> > >
> > > TaskInstance.commit triggers the *checkpoint* . It is triggered every
> > > task.commit.ms , (default is 60000ms). The code is here
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> > > >
> > > . Basically, the RunLoop class calls the commit method, but only
> trigger
> > > the commit behavior every configured time.
> > >
> > > If you are talking about the *changelog*, it's not controlled by the
> > commit
> > > method. Instead, every put/delete calls the "send
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> > > >"
> > > of the system Producer. (code is here
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> > > >).
> > > In terms of how often the "send" really *send *to the broker (e.g.
> > kafka),
> > > it depends on your producer's configuration. For example, in Kafka, you
> > can
> > > have the producer send a batch (setting async), or send one msg a time
> > > (setting sync). What it means is that, it leaves the System to decide
> how
> > > to deal with the "send" method.
> > >
> > >
> > > {quote}
> > > 2. Is there a way to buffer these commit activities in memory and flush
> > > periodically? Our job is joining >1mm messages per second using a KV
> > store
> > > and we have a lot of concern for the changelog size, as in the worst
> > case,
> > > the change log will grow as fast as the input log.
> > > {quote}
> > >
> > > If you are talking about the checkpoint, you can change the
> > task.commit.ms
> > > .
> > >
> > > If you are thinking of the changelog (kv-store), you can change the
> > > producer's config to batch a few changes and send to the broker.
> > >
> > > I think the guys in the community with more operational experience are
> > able
> > > to tell you what is the best practice.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang724@gmail.com
> > >
> > > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <ch...@gmail.com>
> > wrote:
> > >
> > > > We are trying to understand the order of commits when processing each
> > > > message in a Samza job.
> > > >
> > > > T1: input offset commit
> > > > T2: changelog commit
> > > > T3: output commit
> > > >
> > > > By looking at the code snippet in
> > > >
> > > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > > > ,
> > > > my understanding is that for each input message, Samza always send
> > update
> > > > message on changelog, send the output message and then commit the
> input
> > > > offset. It makes sense to me at the high level in terms of at least
> > once
> > > > processing.
> > > >
> > > > Specifically, we have two dumb questions:
> > > >
> > > > 1. When implementing our Samza task, does each call of process method
> > > > triggers a call to TaskInstance.commit?
> > > > 2. Is there a way to buffer these commit activities in memory and
> flush
> > > > periodically? Our job is joining >1mm messages per second using a KV
> > > store
> > > > and we have a lot of concern for the changelog size, as in the worst
> > > case,
> > > > the change log will grow as fast as the input log.
> > > >
> > > > Chen
> > > >
> > > > --
> > > > Chen Song
> > > >
> > >
> >
> >
> >
> > --
> > Chen Song
> >
>



-- 
Chen Song

Re: question on commit on changelog

Posted by Yi Pan <ni...@gmail.com>.
Hi, Chen,

So, is your goal to improve the throughput to the changelog topic or reduce
the size of the changelog topic? If you are targeting for later and your
KV-store truly is of the size of the input log, I don't see how it is
possible. In a lot of use cases, users will only need to retain the
*recent* certain time period of input log. In that case, you can choose to
periodically purge the expired records in KV-store to reduce the size (both
for the KV-store and the changelog).

Regards,
-Yi

On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <ch...@gmail.com> wrote:

> Thanks Yan.
>
> Very good explanation on 1).
>
> For 2), I understand that users can tune the size of the batch for Kafka
> producer. However, that doesn't change the number of messages sent to the
> changelog topic. In our case, we process a high volume log  (1.5MM
> records/second) will update kv store for each message and this will result
> the changelog to grow to the same size of input log. Even with compaction
> turned on changelog, it is not very scalable. I am wondering if there is a
> way to mitigate this problem.
>
>
> On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <ya...@gmail.com> wrote:
>
> > Hi Chen Song,
> >
> > There are two different concepts: *checkpoint* and *changelog*.
> Checkpoint
> > is for the offset of the messages, while the changelog is for the
> kv-store.
> > The code snippet you show is for the checkpoint , not for the changelog.
> >
> > {quote}
> > 1. When implementing our Samza task, does each call of process method
> > triggers a call to TaskInstance.commit?
> > {quote}
> >
> > TaskInstance.commit triggers the *checkpoint* . It is triggered every
> > task.commit.ms , (default is 60000ms). The code is here
> > <
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> > >
> > . Basically, the RunLoop class calls the commit method, but only trigger
> > the commit behavior every configured time.
> >
> > If you are talking about the *changelog*, it's not controlled by the
> commit
> > method. Instead, every put/delete calls the "send
> > <
> >
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> > >"
> > of the system Producer. (code is here
> > <
> >
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> > >).
> > In terms of how often the "send" really *send *to the broker (e.g.
> kafka),
> > it depends on your producer's configuration. For example, in Kafka, you
> can
> > have the producer send a batch (setting async), or send one msg a time
> > (setting sync). What it means is that, it leaves the System to decide how
> > to deal with the "send" method.
> >
> >
> > {quote}
> > 2. Is there a way to buffer these commit activities in memory and flush
> > periodically? Our job is joining >1mm messages per second using a KV
> store
> > and we have a lot of concern for the changelog size, as in the worst
> case,
> > the change log will grow as fast as the input log.
> > {quote}
> >
> > If you are talking about the checkpoint, you can change the
> task.commit.ms
> > .
> >
> > If you are thinking of the changelog (kv-store), you can change the
> > producer's config to batch a few changes and send to the broker.
> >
> > I think the guys in the community with more operational experience are
> able
> > to tell you what is the best practice.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> >
> > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <ch...@gmail.com>
> wrote:
> >
> > > We are trying to understand the order of commits when processing each
> > > message in a Samza job.
> > >
> > > T1: input offset commit
> > > T2: changelog commit
> > > T3: output commit
> > >
> > > By looking at the code snippet in
> > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > > ,
> > > my understanding is that for each input message, Samza always send
> update
> > > message on changelog, send the output message and then commit the input
> > > offset. It makes sense to me at the high level in terms of at least
> once
> > > processing.
> > >
> > > Specifically, we have two dumb questions:
> > >
> > > 1. When implementing our Samza task, does each call of process method
> > > triggers a call to TaskInstance.commit?
> > > 2. Is there a way to buffer these commit activities in memory and flush
> > > periodically? Our job is joining >1mm messages per second using a KV
> > store
> > > and we have a lot of concern for the changelog size, as in the worst
> > case,
> > > the change log will grow as fast as the input log.
> > >
> > > Chen
> > >
> > > --
> > > Chen Song
> > >
> >
>
>
>
> --
> Chen Song
>

Re: question on commit on changelog

Posted by Chen Song <ch...@gmail.com>.
Thanks Yan.

Very good explanation on 1).

For 2), I understand that users can tune the size of the batch for Kafka
producer. However, that doesn't change the number of messages sent to the
changelog topic. In our case, we process a high volume log  (1.5MM
records/second) will update kv store for each message and this will result
the changelog to grow to the same size of input log. Even with compaction
turned on changelog, it is not very scalable. I am wondering if there is a
way to mitigate this problem.


On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <ya...@gmail.com> wrote:

> Hi Chen Song,
>
> There are two different concepts: *checkpoint* and *changelog*. Checkpoint
> is for the offset of the messages, while the changelog is for the kv-store.
> The code snippet you show is for the checkpoint , not for the changelog.
>
> {quote}
> 1. When implementing our Samza task, does each call of process method
> triggers a call to TaskInstance.commit?
> {quote}
>
> TaskInstance.commit triggers the *checkpoint* . It is triggered every
> task.commit.ms , (default is 60000ms). The code is here
> <
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> >
> . Basically, the RunLoop class calls the commit method, but only trigger
> the commit behavior every configured time.
>
> If you are talking about the *changelog*, it's not controlled by the commit
> method. Instead, every put/delete calls the "send
> <
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> >"
> of the system Producer. (code is here
> <
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> >).
> In terms of how often the "send" really *send *to the broker (e.g. kafka),
> it depends on your producer's configuration. For example, in Kafka, you can
> have the producer send a batch (setting async), or send one msg a time
> (setting sync). What it means is that, it leaves the System to decide how
> to deal with the "send" method.
>
>
> {quote}
> 2. Is there a way to buffer these commit activities in memory and flush
> periodically? Our job is joining >1mm messages per second using a KV store
> and we have a lot of concern for the changelog size, as in the worst case,
> the change log will grow as fast as the input log.
> {quote}
>
> If you are talking about the checkpoint, you can change the task.commit.ms
> .
>
> If you are thinking of the changelog (kv-store), you can change the
> producer's config to batch a few changes and send to the broker.
>
> I think the guys in the community with more operational experience are able
> to tell you what is the best practice.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <ch...@gmail.com> wrote:
>
> > We are trying to understand the order of commits when processing each
> > message in a Samza job.
> >
> > T1: input offset commit
> > T2: changelog commit
> > T3: output commit
> >
> > By looking at the code snippet in
> >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > ,
> > my understanding is that for each input message, Samza always send update
> > message on changelog, send the output message and then commit the input
> > offset. It makes sense to me at the high level in terms of at least once
> > processing.
> >
> > Specifically, we have two dumb questions:
> >
> > 1. When implementing our Samza task, does each call of process method
> > triggers a call to TaskInstance.commit?
> > 2. Is there a way to buffer these commit activities in memory and flush
> > periodically? Our job is joining >1mm messages per second using a KV
> store
> > and we have a lot of concern for the changelog size, as in the worst
> case,
> > the change log will grow as fast as the input log.
> >
> > Chen
> >
> > --
> > Chen Song
> >
>



-- 
Chen Song

Re: question on commit on changelog

Posted by Yan Fang <ya...@gmail.com>.
Hi Chen Song,

There are two different concepts: *checkpoint* and *changelog*. Checkpoint
is for the offset of the messages, while the changelog is for the kv-store.
The code snippet you show is for the checkpoint , not for the changelog.

{quote}
1. When implementing our Samza task, does each call of process method
triggers a call to TaskInstance.commit?
{quote}

TaskInstance.commit triggers the *checkpoint* . It is triggered every
task.commit.ms , (default is 60000ms). The code is here
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166>
. Basically, the RunLoop class calls the commit method, but only trigger
the commit behavior every configured time.

If you are talking about the *changelog*, it's not controlled by the commit
method. Instead, every put/delete calls the "send
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51>"
of the system Producer. (code is here
<https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66>).
In terms of how often the "send" really *send *to the broker (e.g. kafka),
it depends on your producer's configuration. For example, in Kafka, you can
have the producer send a batch (setting async), or send one msg a time
(setting sync). What it means is that, it leaves the System to decide how
to deal with the "send" method.


{quote}
2. Is there a way to buffer these commit activities in memory and flush
periodically? Our job is joining >1mm messages per second using a KV store
and we have a lot of concern for the changelog size, as in the worst case,
the change log will grow as fast as the input log.
{quote}

If you are talking about the checkpoint, you can change the task.commit.ms .

If you are thinking of the changelog (kv-store), you can change the
producer's config to batch a few changes and send to the broker.

I think the guys in the community with more operational experience are able
to tell you what is the best practice.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <ch...@gmail.com> wrote:

> We are trying to understand the order of commits when processing each
> message in a Samza job.
>
> T1: input offset commit
> T2: changelog commit
> T3: output commit
>
> By looking at the code snippet in
>
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> ,
> my understanding is that for each input message, Samza always send update
> message on changelog, send the output message and then commit the input
> offset. It makes sense to me at the high level in terms of at least once
> processing.
>
> Specifically, we have two dumb questions:
>
> 1. When implementing our Samza task, does each call of process method
> triggers a call to TaskInstance.commit?
> 2. Is there a way to buffer these commit activities in memory and flush
> periodically? Our job is joining >1mm messages per second using a KV store
> and we have a lot of concern for the changelog size, as in the worst case,
> the change log will grow as fast as the input log.
>
> Chen
>
> --
> Chen Song
>