You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srikanth <sr...@gmail.com> on 2016/10/09 00:25:04 UTC

Kafka 0.10 integ offset commit

Hello,

Spark streaming kafka 0.10 integ provides an option to commit offset to
kafka using commitAsyn() API.
This only records the offset commit request. The actual commit is performed
in compute() after RDD for next batch is created.
Why is this so? Why not do a commit right when the API is called?
Anyway the commit process itself is async with an option to provide
callback handler.

This adds a window where application does a commit but it is not recorded
in kafka internal topic.
Any failure during that window will cause the last batch to be recomputed.

My app does a sink to external source that can't be idempotent. As such the
operations are assumed to be atleast once.
This seems to be one place where duplicates and be reduced.

Srikanth

Re: Kafka 0.10 integ offset commit

Posted by Cody Koeninger <co...@koeninger.org>.
That's cool, just be aware that all you're affecting is the time
between commits, not overall correctness.

Good call on the iterator not draining the queue, I'll fix that.

On Sun, Oct 9, 2016 at 12:22 PM, Srikanth <sr...@gmail.com> wrote:
> I'll probably add this behavior. It's a good balance between not having to
> rely on another external system just for offset management and reducing
> duplicates.
> I was more worried about the underlying framework using the consumer in
> parallel. Will watch out for concurrent mod exp.
>
> BTW, the commitQueue used to store offset commit request is only appended.
> Elements are never removed. Looks like a bug??
>
>
> On Sun, Oct 9, 2016 at 1:03 AM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> People may be calling commit from listeners or who knows where.  Point
>> is it's not thread safe.  If it's really important to you, it should
>> be pretty straightforward for you to hack on it to allow it at your
>> own risk.  There is a check for concurrent access in the consumer, so
>> worst case scenario you should get an exception.
>>
>> On Sat, Oct 8, 2016 at 9:18 PM, Srikanth <sr...@gmail.com> wrote:
>> > If I call commit in foreachrdd at the end of a batch, is there still a
>> > possibility of another thread using the same consumer? Assuming I've not
>> > configured scheduler to run parallel jobs.
>> >
>> >
>> > On Oct 8, 2016 8:39 PM, "Cody Koeninger" <co...@koeninger.org> wrote:
>> >>
>> >> The underlying kafka consumer isn't thread safe.  Calling the actual
>> >> commit in compute means it's called in the same thread as the other
>> >> consumer calls.
>> >>
>> >> Using kafka as an offset store only works with correctly with
>> >> idempotent datastore writes anyway, so the question of when the commit
>> >> happens shouldn't be an issue.
>> >>
>> >> On Sat, Oct 8, 2016 at 7:25 PM, Srikanth <sr...@gmail.com> wrote:
>> >> > Hello,
>> >> >
>> >> > Spark streaming kafka 0.10 integ provides an option to commit offset
>> >> > to
>> >> > kafka using commitAsyn() API.
>> >> > This only records the offset commit request. The actual commit is
>> >> > performed
>> >> > in compute() after RDD for next batch is created.
>> >> > Why is this so? Why not do a commit right when the API is called?
>> >> > Anyway the commit process itself is async with an option to provide
>> >> > callback
>> >> > handler.
>> >> >
>> >> > This adds a window where application does a commit but it is not
>> >> > recorded in
>> >> > kafka internal topic.
>> >> > Any failure during that window will cause the last batch to be
>> >> > recomputed.
>> >> >
>> >> > My app does a sink to external source that can't be idempotent. As
>> >> > such
>> >> > the
>> >> > operations are assumed to be atleast once.
>> >> > This seems to be one place where duplicates and be reduced.
>> >> >
>> >> > Srikanth
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Kafka 0.10 integ offset commit

Posted by Srikanth <sr...@gmail.com>.
I'll probably add this behavior. It's a good balance between not having to
rely on another external system just for offset management and reducing
duplicates.
I was more worried about the underlying framework using the consumer in
parallel. Will watch out for concurrent mod exp.

BTW, the commitQueue used to store offset commit request is only appended.
Elements are never removed. Looks like a bug??


On Sun, Oct 9, 2016 at 1:03 AM, Cody Koeninger <co...@koeninger.org> wrote:

> People may be calling commit from listeners or who knows where.  Point
> is it's not thread safe.  If it's really important to you, it should
> be pretty straightforward for you to hack on it to allow it at your
> own risk.  There is a check for concurrent access in the consumer, so
> worst case scenario you should get an exception.
>
> On Sat, Oct 8, 2016 at 9:18 PM, Srikanth <sr...@gmail.com> wrote:
> > If I call commit in foreachrdd at the end of a batch, is there still a
> > possibility of another thread using the same consumer? Assuming I've not
> > configured scheduler to run parallel jobs.
> >
> >
> > On Oct 8, 2016 8:39 PM, "Cody Koeninger" <co...@koeninger.org> wrote:
> >>
> >> The underlying kafka consumer isn't thread safe.  Calling the actual
> >> commit in compute means it's called in the same thread as the other
> >> consumer calls.
> >>
> >> Using kafka as an offset store only works with correctly with
> >> idempotent datastore writes anyway, so the question of when the commit
> >> happens shouldn't be an issue.
> >>
> >> On Sat, Oct 8, 2016 at 7:25 PM, Srikanth <sr...@gmail.com> wrote:
> >> > Hello,
> >> >
> >> > Spark streaming kafka 0.10 integ provides an option to commit offset
> to
> >> > kafka using commitAsyn() API.
> >> > This only records the offset commit request. The actual commit is
> >> > performed
> >> > in compute() after RDD for next batch is created.
> >> > Why is this so? Why not do a commit right when the API is called?
> >> > Anyway the commit process itself is async with an option to provide
> >> > callback
> >> > handler.
> >> >
> >> > This adds a window where application does a commit but it is not
> >> > recorded in
> >> > kafka internal topic.
> >> > Any failure during that window will cause the last batch to be
> >> > recomputed.
> >> >
> >> > My app does a sink to external source that can't be idempotent. As
> such
> >> > the
> >> > operations are assumed to be atleast once.
> >> > This seems to be one place where duplicates and be reduced.
> >> >
> >> > Srikanth
>

Re: Kafka 0.10 integ offset commit

Posted by Cody Koeninger <co...@koeninger.org>.
People may be calling commit from listeners or who knows where.  Point
is it's not thread safe.  If it's really important to you, it should
be pretty straightforward for you to hack on it to allow it at your
own risk.  There is a check for concurrent access in the consumer, so
worst case scenario you should get an exception.

On Sat, Oct 8, 2016 at 9:18 PM, Srikanth <sr...@gmail.com> wrote:
> If I call commit in foreachrdd at the end of a batch, is there still a
> possibility of another thread using the same consumer? Assuming I've not
> configured scheduler to run parallel jobs.
>
>
> On Oct 8, 2016 8:39 PM, "Cody Koeninger" <co...@koeninger.org> wrote:
>>
>> The underlying kafka consumer isn't thread safe.  Calling the actual
>> commit in compute means it's called in the same thread as the other
>> consumer calls.
>>
>> Using kafka as an offset store only works with correctly with
>> idempotent datastore writes anyway, so the question of when the commit
>> happens shouldn't be an issue.
>>
>> On Sat, Oct 8, 2016 at 7:25 PM, Srikanth <sr...@gmail.com> wrote:
>> > Hello,
>> >
>> > Spark streaming kafka 0.10 integ provides an option to commit offset to
>> > kafka using commitAsyn() API.
>> > This only records the offset commit request. The actual commit is
>> > performed
>> > in compute() after RDD for next batch is created.
>> > Why is this so? Why not do a commit right when the API is called?
>> > Anyway the commit process itself is async with an option to provide
>> > callback
>> > handler.
>> >
>> > This adds a window where application does a commit but it is not
>> > recorded in
>> > kafka internal topic.
>> > Any failure during that window will cause the last batch to be
>> > recomputed.
>> >
>> > My app does a sink to external source that can't be idempotent. As such
>> > the
>> > operations are assumed to be atleast once.
>> > This seems to be one place where duplicates and be reduced.
>> >
>> > Srikanth

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Kafka 0.10 integ offset commit

Posted by Srikanth <sr...@gmail.com>.
If I call commit in foreachrdd at the end of a batch, is there still a
possibility of another thread using the same consumer? Assuming I've not
configured scheduler to run parallel jobs.

On Oct 8, 2016 8:39 PM, "Cody Koeninger" <co...@koeninger.org> wrote:

> The underlying kafka consumer isn't thread safe.  Calling the actual
> commit in compute means it's called in the same thread as the other
> consumer calls.
>
> Using kafka as an offset store only works with correctly with
> idempotent datastore writes anyway, so the question of when the commit
> happens shouldn't be an issue.
>
> On Sat, Oct 8, 2016 at 7:25 PM, Srikanth <sr...@gmail.com> wrote:
> > Hello,
> >
> > Spark streaming kafka 0.10 integ provides an option to commit offset to
> > kafka using commitAsyn() API.
> > This only records the offset commit request. The actual commit is
> performed
> > in compute() after RDD for next batch is created.
> > Why is this so? Why not do a commit right when the API is called?
> > Anyway the commit process itself is async with an option to provide
> callback
> > handler.
> >
> > This adds a window where application does a commit but it is not
> recorded in
> > kafka internal topic.
> > Any failure during that window will cause the last batch to be
> recomputed.
> >
> > My app does a sink to external source that can't be idempotent. As such
> the
> > operations are assumed to be atleast once.
> > This seems to be one place where duplicates and be reduced.
> >
> > Srikanth
>

Re: Kafka 0.10 integ offset commit

Posted by Cody Koeninger <co...@koeninger.org>.
The underlying kafka consumer isn't thread safe.  Calling the actual
commit in compute means it's called in the same thread as the other
consumer calls.

Using kafka as an offset store only works with correctly with
idempotent datastore writes anyway, so the question of when the commit
happens shouldn't be an issue.

On Sat, Oct 8, 2016 at 7:25 PM, Srikanth <sr...@gmail.com> wrote:
> Hello,
>
> Spark streaming kafka 0.10 integ provides an option to commit offset to
> kafka using commitAsyn() API.
> This only records the offset commit request. The actual commit is performed
> in compute() after RDD for next batch is created.
> Why is this so? Why not do a commit right when the API is called?
> Anyway the commit process itself is async with an option to provide callback
> handler.
>
> This adds a window where application does a commit but it is not recorded in
> kafka internal topic.
> Any failure during that window will cause the last batch to be recomputed.
>
> My app does a sink to external source that can't be idempotent. As such the
> operations are assumed to be atleast once.
> This seems to be one place where duplicates and be reduced.
>
> Srikanth

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org