You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by David Rosenstrauch <da...@gmail.com> on 2017/04/28 15:29:25 UTC

Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

 I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
using the technique for storing the offsets in Kafka, as described at:

https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself

I.e., grabbing the list of offsets before I start processing a batch of
RDD's, and then committing them when I'm done.  The process pretty much
works:  when I shut down my streaming process and then start it up again,
it pretty much picks up where it left off.

However, it looks like there's some overlap happening, where a few of the
messages are being processed by both the old and the new streaming job
runs.  I.e., see the following log messages:

End of old job run:
17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])

Start of new job run:
17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
251044])


Notice that in partition 0, for example, the 3 messages with offsets 250959
through 250961 are being processed twice - once by the old job, and once by
the new.  I would have expected that in the new run, the offset range for
partition 0 would have been 250962 -> 251044, which would result in
exactly-once semantics.

Am I misunderstanding how this should work?  (I.e., exactly-once semantics
is not possible here?)

Thanks,

DR

Re: Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

Posted by Cody Koeninger <co...@koeninger.org>.
It's asynchronous.  If your job stopped before the commit happened,
then of course it's not guaranteed to succeed.  But even if those
commits were somehow guaranteed to succeed even if your job stopped...
you still need idempotent output operations.  The point of
transactionality isn't that it's synchronous, it's that offsets and
output are stored in the same place, and guaranteed to either both
succeed or both fail.  Kafka can't give you that guarantee.  Some data
stores can.

With regard to doc clarity, that section starts by saying
"
Kafka delivery semantics in the case of failure depend on how and when
offsets are stored. Spark output operations are at-least-once. So if
you want the equivalent of exactly-once semantics, you must either
store offsets after an idempotent output, or store offsets in an
atomic transaction alongside output.
"

If you think that's still not clear, open a PR to contribute to the docs.



On Fri, Apr 28, 2017 at 10:47 AM, David Rosenstrauch <da...@gmail.com> wrote:
> Yes, I saw that sentence too.  But it's rather short and not very
> explanatory, and there doesn't seem to be any further info available
> anywhere that expands on it.
>
> When I parse out that sentence:
>
> 1) "Kafka is not transactional" - i.e., the commits are done asynchronously,
> not synchronously.
> 2) "so your outputs must still be idempotent" - some of your commits may
> duplicate/overlap, so you need to be able to handle processing the same
> event(s) more than once.
>
> That doesn't quite make sense to me though.  I don't quite understand why #1
> implies #2.  Yes, Kafka isn't transactional - i.e., doesn't process my
> commits synchronously.  But it should be processing my commits *eventually*.
> If you look at my output from the previous message, even though I called
> commitAsync on 250959 -> 250962 in the first job, Kafka never actually
> processed those commits.  That's not an eventual/asynchronous commit; that's
> an optional commit.
>
> Is that in fact the semantics here - i.e., calls to commitAsync are not
> actually guaranteed to succeed?  If that's the case, the docs could really
> be a *lot* clearer about that.
>
> Thanks,
>
> DR
>
> On Fri, Apr 28, 2017 at 11:34 AM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> From that doc:
>>
>> " However, Kafka is not transactional, so your outputs must still be
>> idempotent. "
>>
>>
>>
>> On Fri, Apr 28, 2017 at 10:29 AM, David Rosenstrauch <da...@gmail.com>
>> wrote:
>> > I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
>> > using
>> > the technique for storing the offsets in Kafka, as described at:
>> >
>> >
>> > https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
>> >
>> > I.e., grabbing the list of offsets before I start processing a batch of
>> > RDD's, and then committing them when I'm done.  The process pretty much
>> > works:  when I shut down my streaming process and then start it up
>> > again, it
>> > pretty much picks up where it left off.
>> >
>> > However, it looks like there's some overlap happening, where a few of
>> > the
>> > messages are being processed by both the old and the new streaming job
>> > runs.
>> > I.e., see the following log messages:
>> >
>> > End of old job run:
>> > 17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
>> > OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
>> > 250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
>> > 18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
>> > 18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])
>> >
>> > Start of new job run:
>> > 17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with
>> > offsets:
>> > OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
>> > 100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
>> > 100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
>> > 100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
>> > 251044])
>> >
>> >
>> > Notice that in partition 0, for example, the 3 messages with offsets
>> > 250959
>> > through 250961 are being processed twice - once by the old job, and once
>> > by
>> > the new.  I would have expected that in the new run, the offset range
>> > for
>> > partition 0 would have been 250962 -> 251044, which would result in
>> > exactly-once semantics.
>> >
>> > Am I misunderstanding how this should work?  (I.e., exactly-once
>> > semantics
>> > is not possible here?)
>> >
>> > Thanks,
>> >
>> > DR
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

Posted by David Rosenstrauch <da...@gmail.com>.
Yes, I saw that sentence too.  But it's rather short and not very
explanatory, and there doesn't seem to be any further info available
anywhere that expands on it.

When I parse out that sentence:

1) "Kafka is not transactional" - i.e., the commits are done
asynchronously, not synchronously.
2) "so your outputs must still be idempotent" - some of your commits may
duplicate/overlap, so you need to be able to handle processing the same
event(s) more than once.

That doesn't quite make sense to me though.  I don't quite understand why
#1 implies #2.  Yes, Kafka isn't transactional - i.e., doesn't process my
commits synchronously.  But it should be processing my commits
*eventually*.  If you look at my output from the previous message, even
though I called commitAsync on 250959 -> 250962 in the first job, Kafka
never actually processed those commits.  That's not an
eventual/asynchronous commit; that's an optional commit.

Is that in fact the semantics here - i.e., calls to commitAsync are not
actually guaranteed to succeed?  If that's the case, the docs could really
be a *lot* clearer about that.

Thanks,

DR

On Fri, Apr 28, 2017 at 11:34 AM, Cody Koeninger <co...@koeninger.org> wrote:

> From that doc:
>
> " However, Kafka is not transactional, so your outputs must still be
> idempotent. "
>
>
>
> On Fri, Apr 28, 2017 at 10:29 AM, David Rosenstrauch <da...@gmail.com>
> wrote:
> > I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
> using
> > the technique for storing the offsets in Kafka, as described at:
> >
> > https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-
> integration.html#kafka-itself
> >
> > I.e., grabbing the list of offsets before I start processing a batch of
> > RDD's, and then committing them when I'm done.  The process pretty much
> > works:  when I shut down my streaming process and then start it up
> again, it
> > pretty much picks up where it left off.
> >
> > However, it looks like there's some overlap happening, where a few of the
> > messages are being processed by both the old and the new streaming job
> runs.
> > I.e., see the following log messages:
> >
> > End of old job run:
> > 17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
> > OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> > 250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> > 18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> > 18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])
> >
> > Start of new job run:
> > 17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
> > OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> > 100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> > 100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
> > 100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> > 251044])
> >
> >
> > Notice that in partition 0, for example, the 3 messages with offsets
> 250959
> > through 250961 are being processed twice - once by the old job, and once
> by
> > the new.  I would have expected that in the new run, the offset range for
> > partition 0 would have been 250962 -> 251044, which would result in
> > exactly-once semantics.
> >
> > Am I misunderstanding how this should work?  (I.e., exactly-once
> semantics
> > is not possible here?)
> >
> > Thanks,
> >
> > DR
>

Re: Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

Posted by Cody Koeninger <co...@koeninger.org>.
From that doc:

" However, Kafka is not transactional, so your outputs must still be
idempotent. "



On Fri, Apr 28, 2017 at 10:29 AM, David Rosenstrauch <da...@gmail.com> wrote:
> I'm doing a POC to test recovery with spark streaming from Kafka.  I'm using
> the technique for storing the offsets in Kafka, as described at:
>
> https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
>
> I.e., grabbing the list of offsets before I start processing a batch of
> RDD's, and then committing them when I'm done.  The process pretty much
> works:  when I shut down my streaming process and then start it up again, it
> pretty much picks up where it left off.
>
> However, it looks like there's some overlap happening, where a few of the
> messages are being processed by both the old and the new streaming job runs.
> I.e., see the following log messages:
>
> End of old job run:
> 17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
> OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> 250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> 18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> 18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])
>
> Start of new job run:
> 17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
> OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> 100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> 100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
> 100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> 251044])
>
>
> Notice that in partition 0, for example, the 3 messages with offsets 250959
> through 250961 are being processed twice - once by the old job, and once by
> the new.  I would have expected that in the new run, the offset range for
> partition 0 would have been 250962 -> 251044, which would result in
> exactly-once semantics.
>
> Am I misunderstanding how this should work?  (I.e., exactly-once semantics
> is not possible here?)
>
> Thanks,
>
> DR

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org