You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Yui Yoi <sh...@gmail.com> on 2018/09/12 10:57:02 UTC

A question about kafka streams API

TL;DR:
my streams application skips uncommitted messages

Hello,
I'm using streams API via spring framework and experiencing a weird
behavior which I would like to get an explanation to:
First of all: The attached zip is my test project, I used kafka cli to run
a localhost broker and zookeeper

what is happening is as follows:
1. I send an invalid message, such as "asd", and my consumer has a lag and
error message as expected
2. I send a valid message such as "{}", but instead of rereading the first
message as expected from an "earliest" configured application - my
application reads the latest message, commits it and ignoring the one in
error, thus i have no lag!
3. When I'm running my application when there are uncommitted messages - my
application reads the FIRST not committed message, as if it IS an
"earliest" configured application!

In your documentation you assure "at least once" behavior, but according to
section 2. it happens so my application does not receive those messages not
even once (as i said, those messages are uncommitted)

My guess is that it has something to do with the stream's cache... I would
very like to have an explanation or even a solution

I'm turning to you as a last resort, after long weeks of research and
experiments

Thanks alot

Re: A question about kafka streams API

Posted by John Roesler <jo...@confluent.io>.
Hey Yui,

Sorry, I haven't had a chance to respond. I've got a pretty busy couple of
weeks coming up, so I don't know when I'll look at this, but I find this
puzzling. I'll save your email and try what you said to see if I can figure
it out. Thanks for the repro code.

Let me know if you figure it out. Also, if you think you've found a bug,
feel free to file a jira ticket as well. It might get broader visibility
that way.

Thanks,
-John

On Thu, Sep 13, 2018 at 1:57 AM Yui Yoi <sh...@gmail.com> wrote:

> Hi Adam and John, thank you for your effort!
> We are implementing full idem-potency in our projects so that's nothing to
> worry about.
> As to what John said - we only have one partition, I personally assured
> that.
> So as i wrote in section 2. of my first message in this conversation - my
> stream should have processed the "asd" message again because it is not
> committed yet.
> That's why i suspect it has something to do with the stream's cache; maybe
> something like:
> 1. "asd" got processed and restored in cache
> 2. "{}" got processed and cached too.
> 3. commit interval makes the stream commit the offset of "{}"
>
>
> B.t.w
> If you want to run my application you should:
> 1. open it in some java editor as maven project
> 2. run it as a normal java application
> 3. setup kafka server & zookeeper on localhost
> 4. then you can send the above messages via cli
>
> John - even if you send "asd1", "asd2", "asd3" you will see in the logs
> that my app takes the latest each time
>
> Of course that's far beyond what i can ask from you guys to do, thanks a
> lot for your help.
>
> On Wed, Sep 12, 2018 at 8:14 PM John Roesler <jo...@confluent.io> wrote:
>
> > Hi!
> >
> > As Adam said, if you throw an exception during processing, it should
> cause
> > Streams to shut itself down and *not* commit that message. Therefore,
> when
> > you start up again, it should again attempt to process that same message
> > (and shut down again).
> >
> > Within a single partition, messages are processed in order, so a bad
> > message will block the queue, and you should not see subsequent messages
> > get processed.
> >
> > However, if your later message "{}" goes to a different partition than
> the
> > bad message, then there's no relationship between them, and the later,
> > good, message might get processed.
> >
> > Does that help?
> > -John
> >
> > On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare <adam.bellemare@gmail.com
> >
> > wrote:
> >
> > > Hi Yui Yoi
> > >
> > >
> > > Keep in mind that Kafka Consumers don't traditionally request only a
> > single
> > > message at a time, but instead requests them in batches. This allows
> for
> > > much higher throughput, but does result in the scenario of
> > "at-least-once"
> > > processing. Generally what will happen in this scenario is the
> following:
> > >
> > > 1) Client requests the next set of messages from offset (t). For
> example,
> > > assume it gets 10 messages and message 6 is "bad".
> > > 2) The client's processor will then process the messages one at a time.
> > > Note that the offsets are not committed after the message is processed,
> > but
> > > only at the end of the batch.
> > > 3) The bad message it hit by the processor. At this point you can
> decide
> > to
> > > skip the message, throw an exception, etc.
> > > 4a) If you decide to skip the message, processing will continue. Once
> all
> > > 10 messages are processed, the new offset (t+10) offset is committed
> back
> > > to Kafka.
> > > 4b) If you decide to throw an exception and terminate your app, you
> will
> > > have still processed the messages that came before the bad message.
> > Because
> > > the offset (t+10) is not committed, the next time you start the app it
> > will
> > > consume from offset t, and those messages will be processed again. This
> > is
> > > "at-least-once" processing.
> > >
> > >
> > > Now, if you need exactly-once processing, you have two choices -
> > > 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> > > familiar with your framework, it may support it as well).
> > > 2) Use idempotent practices (ie: it doesn't matter if the same messages
> > get
> > > processed more than once).
> > >
> > >
> > > Hope this helps -
> > >
> > > Adam
> > >
> > >
> > > On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <sh...@gmail.com> wrote:
> > >
> > > > Hi Adam,
> > > > Thanks a lot for the rapid response, it did helped!
> > > >
> > > > Let me though ask one more simple question: Can I make a stream
> > > application
> > > > stuck on an invalid message? and not consuming any further messages?
> > > >
> > > > Thanks again
> > > >
> > > > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <
> > adam.bellemare@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Yui Yoi
> > > > >
> > > > > Preface: I am not familiar with the spring framework.
> > > > >
> > > > > "Earliest" when it comes to consuming from Kafka means, "Start
> > reading
> > > > from
> > > > > the first message in the topic, *if there is no offset stored for
> > that
> > > > > consumer group*". It sounds like you are expecting it to re-read
> each
> > > > > message whenever a new message comes in. This is not going to
> happen,
> > > as
> > > > > there will be a committed offset and "earliest" will no longer be
> > used.
> > > > If
> > > > > you were to use "latest" instead, if a consumer is started that
> does
> > > not
> > > > > have a valid offset, it would use the very latest message in the
> > topic
> > > as
> > > > > the starting offset for message consumption.
> > > > >
> > > > > Now, if you are using the same consumer group each time you run the
> > > > > application (which it seems is true, as you have "test-group"
> > hardwired
> > > > in
> > > > > your application.yml), but you do not tear down your local cluster
> > and
> > > > > clear out its state, you will indeed see the behaviour you
> describe.
> > > > > Remember that Kafka is durable, and maintains the offsets when the
> > > > > individual applications go away. So you are probably seeing this:
> > > > >
> > > > > 1) start application instance 1. It realizes it has no offset when
> it
> > > > tries
> > > > > to register as a consumer on the input topic, so it creates a new
> > > > consumer
> > > > > entry for "earliest" for your consumer group.
> > > > > 2) send message "asd"
> > > > > 3) application instance 1 receives "asd", processes it, and updates
> > the
> > > > > offset (offset head = 1)
> > > > > 4) Terminate instance 1
> > > > > 5) Start application instance 2. It detects correctly that consumer
> > > group
> > > > > "test-group" is available and reads that offset as its starting
> > point.
> > > > > 6) send message "{}"
> > > > > 7) application instance 2 receives "{}", processes it, and updates
> > the
> > > > > offset (offset head = 2)
> > > > > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it
> is
> > > > > telling the Kafka cluster that it belongs to the same consumer
> group
> > as
> > > > > application 1.
> > > > >
> > > > > Hope this helps,
> > > > >
> > > > > Adam
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <sh...@gmail.com>
> > wrote:
> > > > >
> > > > > > TL;DR:
> > > > > > my streams application skips uncommitted messages
> > > > > >
> > > > > > Hello,
> > > > > > I'm using streams API via spring framework and experiencing a
> weird
> > > > > > behavior which I would like to get an explanation to:
> > > > > > First of all: The attached zip is my test project, I used kafka
> cli
> > > to
> > > > > run
> > > > > > a localhost broker and zookeeper
> > > > > >
> > > > > > what is happening is as follows:
> > > > > > 1. I send an invalid message, such as "asd", and my consumer has
> a
> > > lag
> > > > > and
> > > > > > error message as expected
> > > > > > 2. I send a valid message such as "{}", but instead of rereading
> > the
> > > > > first
> > > > > > message as expected from an "earliest" configured application -
> my
> > > > > > application reads the latest message, commits it and ignoring the
> > one
> > > > in
> > > > > > error, thus i have no lag!
> > > > > > 3. When I'm running my application when there are uncommitted
> > > messages
> > > > -
> > > > > > my application reads the FIRST not committed message, as if it IS
> > an
> > > > > > "earliest" configured application!
> > > > > >
> > > > > > In your documentation you assure "at least once" behavior, but
> > > > according
> > > > > > to section 2. it happens so my application does not receive those
> > > > > messages
> > > > > > not even once (as i said, those messages are uncommitted)
> > > > > >
> > > > > > My guess is that it has something to do with the stream's
> cache...
> > I
> > > > > would
> > > > > > very like to have an explanation or even a solution
> > > > > >
> > > > > > I'm turning to you as a last resort, after long weeks of research
> > and
> > > > > > experiments
> > > > > >
> > > > > > Thanks alot
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: A question about kafka streams API

Posted by Yui Yoi <sh...@gmail.com>.
Hi Adam and John, thank you for your effort!
We are implementing full idem-potency in our projects so that's nothing to
worry about.
As to what John said - we only have one partition, I personally assured
that.
So as i wrote in section 2. of my first message in this conversation - my
stream should have processed the "asd" message again because it is not
committed yet.
That's why i suspect it has something to do with the stream's cache; maybe
something like:
1. "asd" got processed and restored in cache
2. "{}" got processed and cached too.
3. commit interval makes the stream commit the offset of "{}"


B.t.w
If you want to run my application you should:
1. open it in some java editor as maven project
2. run it as a normal java application
3. setup kafka server & zookeeper on localhost
4. then you can send the above messages via cli

John - even if you send "asd1", "asd2", "asd3" you will see in the logs
that my app takes the latest each time

Of course that's far beyond what i can ask from you guys to do, thanks a
lot for your help.

On Wed, Sep 12, 2018 at 8:14 PM John Roesler <jo...@confluent.io> wrote:

> Hi!
>
> As Adam said, if you throw an exception during processing, it should cause
> Streams to shut itself down and *not* commit that message. Therefore, when
> you start up again, it should again attempt to process that same message
> (and shut down again).
>
> Within a single partition, messages are processed in order, so a bad
> message will block the queue, and you should not see subsequent messages
> get processed.
>
> However, if your later message "{}" goes to a different partition than the
> bad message, then there's no relationship between them, and the later,
> good, message might get processed.
>
> Does that help?
> -John
>
> On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare <ad...@gmail.com>
> wrote:
>
> > Hi Yui Yoi
> >
> >
> > Keep in mind that Kafka Consumers don't traditionally request only a
> single
> > message at a time, but instead requests them in batches. This allows for
> > much higher throughput, but does result in the scenario of
> "at-least-once"
> > processing. Generally what will happen in this scenario is the following:
> >
> > 1) Client requests the next set of messages from offset (t). For example,
> > assume it gets 10 messages and message 6 is "bad".
> > 2) The client's processor will then process the messages one at a time.
> > Note that the offsets are not committed after the message is processed,
> but
> > only at the end of the batch.
> > 3) The bad message it hit by the processor. At this point you can decide
> to
> > skip the message, throw an exception, etc.
> > 4a) If you decide to skip the message, processing will continue. Once all
> > 10 messages are processed, the new offset (t+10) offset is committed back
> > to Kafka.
> > 4b) If you decide to throw an exception and terminate your app, you will
> > have still processed the messages that came before the bad message.
> Because
> > the offset (t+10) is not committed, the next time you start the app it
> will
> > consume from offset t, and those messages will be processed again. This
> is
> > "at-least-once" processing.
> >
> >
> > Now, if you need exactly-once processing, you have two choices -
> > 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> > familiar with your framework, it may support it as well).
> > 2) Use idempotent practices (ie: it doesn't matter if the same messages
> get
> > processed more than once).
> >
> >
> > Hope this helps -
> >
> > Adam
> >
> >
> > On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <sh...@gmail.com> wrote:
> >
> > > Hi Adam,
> > > Thanks a lot for the rapid response, it did helped!
> > >
> > > Let me though ask one more simple question: Can I make a stream
> > application
> > > stuck on an invalid message? and not consuming any further messages?
> > >
> > > Thanks again
> > >
> > > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <
> adam.bellemare@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Yui Yoi
> > > >
> > > > Preface: I am not familiar with the spring framework.
> > > >
> > > > "Earliest" when it comes to consuming from Kafka means, "Start
> reading
> > > from
> > > > the first message in the topic, *if there is no offset stored for
> that
> > > > consumer group*". It sounds like you are expecting it to re-read each
> > > > message whenever a new message comes in. This is not going to happen,
> > as
> > > > there will be a committed offset and "earliest" will no longer be
> used.
> > > If
> > > > you were to use "latest" instead, if a consumer is started that does
> > not
> > > > have a valid offset, it would use the very latest message in the
> topic
> > as
> > > > the starting offset for message consumption.
> > > >
> > > > Now, if you are using the same consumer group each time you run the
> > > > application (which it seems is true, as you have "test-group"
> hardwired
> > > in
> > > > your application.yml), but you do not tear down your local cluster
> and
> > > > clear out its state, you will indeed see the behaviour you describe.
> > > > Remember that Kafka is durable, and maintains the offsets when the
> > > > individual applications go away. So you are probably seeing this:
> > > >
> > > > 1) start application instance 1. It realizes it has no offset when it
> > > tries
> > > > to register as a consumer on the input topic, so it creates a new
> > > consumer
> > > > entry for "earliest" for your consumer group.
> > > > 2) send message "asd"
> > > > 3) application instance 1 receives "asd", processes it, and updates
> the
> > > > offset (offset head = 1)
> > > > 4) Terminate instance 1
> > > > 5) Start application instance 2. It detects correctly that consumer
> > group
> > > > "test-group" is available and reads that offset as its starting
> point.
> > > > 6) send message "{}"
> > > > 7) application instance 2 receives "{}", processes it, and updates
> the
> > > > offset (offset head = 2)
> > > > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> > > > telling the Kafka cluster that it belongs to the same consumer group
> as
> > > > application 1.
> > > >
> > > > Hope this helps,
> > > >
> > > > Adam
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <sh...@gmail.com>
> wrote:
> > > >
> > > > > TL;DR:
> > > > > my streams application skips uncommitted messages
> > > > >
> > > > > Hello,
> > > > > I'm using streams API via spring framework and experiencing a weird
> > > > > behavior which I would like to get an explanation to:
> > > > > First of all: The attached zip is my test project, I used kafka cli
> > to
> > > > run
> > > > > a localhost broker and zookeeper
> > > > >
> > > > > what is happening is as follows:
> > > > > 1. I send an invalid message, such as "asd", and my consumer has a
> > lag
> > > > and
> > > > > error message as expected
> > > > > 2. I send a valid message such as "{}", but instead of rereading
> the
> > > > first
> > > > > message as expected from an "earliest" configured application - my
> > > > > application reads the latest message, commits it and ignoring the
> one
> > > in
> > > > > error, thus i have no lag!
> > > > > 3. When I'm running my application when there are uncommitted
> > messages
> > > -
> > > > > my application reads the FIRST not committed message, as if it IS
> an
> > > > > "earliest" configured application!
> > > > >
> > > > > In your documentation you assure "at least once" behavior, but
> > > according
> > > > > to section 2. it happens so my application does not receive those
> > > > messages
> > > > > not even once (as i said, those messages are uncommitted)
> > > > >
> > > > > My guess is that it has something to do with the stream's cache...
> I
> > > > would
> > > > > very like to have an explanation or even a solution
> > > > >
> > > > > I'm turning to you as a last resort, after long weeks of research
> and
> > > > > experiments
> > > > >
> > > > > Thanks alot
> > > > >
> > > >
> > >
> >
>

Re: A question about kafka streams API

Posted by John Roesler <jo...@confluent.io>.
Hi!

As Adam said, if you throw an exception during processing, it should cause
Streams to shut itself down and *not* commit that message. Therefore, when
you start up again, it should again attempt to process that same message
(and shut down again).

Within a single partition, messages are processed in order, so a bad
message will block the queue, and you should not see subsequent messages
get processed.

However, if your later message "{}" goes to a different partition than the
bad message, then there's no relationship between them, and the later,
good, message might get processed.

Does that help?
-John

On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare <ad...@gmail.com>
wrote:

> Hi Yui Yoi
>
>
> Keep in mind that Kafka Consumers don't traditionally request only a single
> message at a time, but instead requests them in batches. This allows for
> much higher throughput, but does result in the scenario of "at-least-once"
> processing. Generally what will happen in this scenario is the following:
>
> 1) Client requests the next set of messages from offset (t). For example,
> assume it gets 10 messages and message 6 is "bad".
> 2) The client's processor will then process the messages one at a time.
> Note that the offsets are not committed after the message is processed, but
> only at the end of the batch.
> 3) The bad message it hit by the processor. At this point you can decide to
> skip the message, throw an exception, etc.
> 4a) If you decide to skip the message, processing will continue. Once all
> 10 messages are processed, the new offset (t+10) offset is committed back
> to Kafka.
> 4b) If you decide to throw an exception and terminate your app, you will
> have still processed the messages that came before the bad message. Because
> the offset (t+10) is not committed, the next time you start the app it will
> consume from offset t, and those messages will be processed again. This is
> "at-least-once" processing.
>
>
> Now, if you need exactly-once processing, you have two choices -
> 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> familiar with your framework, it may support it as well).
> 2) Use idempotent practices (ie: it doesn't matter if the same messages get
> processed more than once).
>
>
> Hope this helps -
>
> Adam
>
>
> On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <sh...@gmail.com> wrote:
>
> > Hi Adam,
> > Thanks a lot for the rapid response, it did helped!
> >
> > Let me though ask one more simple question: Can I make a stream
> application
> > stuck on an invalid message? and not consuming any further messages?
> >
> > Thanks again
> >
> > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <adam.bellemare@gmail.com
> >
> > wrote:
> >
> > > Hi Yui Yoi
> > >
> > > Preface: I am not familiar with the spring framework.
> > >
> > > "Earliest" when it comes to consuming from Kafka means, "Start reading
> > from
> > > the first message in the topic, *if there is no offset stored for that
> > > consumer group*". It sounds like you are expecting it to re-read each
> > > message whenever a new message comes in. This is not going to happen,
> as
> > > there will be a committed offset and "earliest" will no longer be used.
> > If
> > > you were to use "latest" instead, if a consumer is started that does
> not
> > > have a valid offset, it would use the very latest message in the topic
> as
> > > the starting offset for message consumption.
> > >
> > > Now, if you are using the same consumer group each time you run the
> > > application (which it seems is true, as you have "test-group" hardwired
> > in
> > > your application.yml), but you do not tear down your local cluster and
> > > clear out its state, you will indeed see the behaviour you describe.
> > > Remember that Kafka is durable, and maintains the offsets when the
> > > individual applications go away. So you are probably seeing this:
> > >
> > > 1) start application instance 1. It realizes it has no offset when it
> > tries
> > > to register as a consumer on the input topic, so it creates a new
> > consumer
> > > entry for "earliest" for your consumer group.
> > > 2) send message "asd"
> > > 3) application instance 1 receives "asd", processes it, and updates the
> > > offset (offset head = 1)
> > > 4) Terminate instance 1
> > > 5) Start application instance 2. It detects correctly that consumer
> group
> > > "test-group" is available and reads that offset as its starting point.
> > > 6) send message "{}"
> > > 7) application instance 2 receives "{}", processes it, and updates the
> > > offset (offset head = 2)
> > > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> > > telling the Kafka cluster that it belongs to the same consumer group as
> > > application 1.
> > >
> > > Hope this helps,
> > >
> > > Adam
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <sh...@gmail.com> wrote:
> > >
> > > > TL;DR:
> > > > my streams application skips uncommitted messages
> > > >
> > > > Hello,
> > > > I'm using streams API via spring framework and experiencing a weird
> > > > behavior which I would like to get an explanation to:
> > > > First of all: The attached zip is my test project, I used kafka cli
> to
> > > run
> > > > a localhost broker and zookeeper
> > > >
> > > > what is happening is as follows:
> > > > 1. I send an invalid message, such as "asd", and my consumer has a
> lag
> > > and
> > > > error message as expected
> > > > 2. I send a valid message such as "{}", but instead of rereading the
> > > first
> > > > message as expected from an "earliest" configured application - my
> > > > application reads the latest message, commits it and ignoring the one
> > in
> > > > error, thus i have no lag!
> > > > 3. When I'm running my application when there are uncommitted
> messages
> > -
> > > > my application reads the FIRST not committed message, as if it IS an
> > > > "earliest" configured application!
> > > >
> > > > In your documentation you assure "at least once" behavior, but
> > according
> > > > to section 2. it happens so my application does not receive those
> > > messages
> > > > not even once (as i said, those messages are uncommitted)
> > > >
> > > > My guess is that it has something to do with the stream's cache... I
> > > would
> > > > very like to have an explanation or even a solution
> > > >
> > > > I'm turning to you as a last resort, after long weeks of research and
> > > > experiments
> > > >
> > > > Thanks alot
> > > >
> > >
> >
>

Re: A question about kafka streams API

Posted by Adam Bellemare <ad...@gmail.com>.
Hi Yui Yoi


Keep in mind that Kafka Consumers don't traditionally request only a single
message at a time, but instead requests them in batches. This allows for
much higher throughput, but does result in the scenario of "at-least-once"
processing. Generally what will happen in this scenario is the following:

1) Client requests the next set of messages from offset (t). For example,
assume it gets 10 messages and message 6 is "bad".
2) The client's processor will then process the messages one at a time.
Note that the offsets are not committed after the message is processed, but
only at the end of the batch.
3) The bad message it hit by the processor. At this point you can decide to
skip the message, throw an exception, etc.
4a) If you decide to skip the message, processing will continue. Once all
10 messages are processed, the new offset (t+10) offset is committed back
to Kafka.
4b) If you decide to throw an exception and terminate your app, you will
have still processed the messages that came before the bad message. Because
the offset (t+10) is not committed, the next time you start the app it will
consume from offset t, and those messages will be processed again. This is
"at-least-once" processing.


Now, if you need exactly-once processing, you have two choices -
1) Use Kafka Streams with exactly-once semantics (though, as I am not
familiar with your framework, it may support it as well).
2) Use idempotent practices (ie: it doesn't matter if the same messages get
processed more than once).


Hope this helps -

Adam


On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <sh...@gmail.com> wrote:

> Hi Adam,
> Thanks a lot for the rapid response, it did helped!
>
> Let me though ask one more simple question: Can I make a stream application
> stuck on an invalid message? and not consuming any further messages?
>
> Thanks again
>
> On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <ad...@gmail.com>
> wrote:
>
> > Hi Yui Yoi
> >
> > Preface: I am not familiar with the spring framework.
> >
> > "Earliest" when it comes to consuming from Kafka means, "Start reading
> from
> > the first message in the topic, *if there is no offset stored for that
> > consumer group*". It sounds like you are expecting it to re-read each
> > message whenever a new message comes in. This is not going to happen, as
> > there will be a committed offset and "earliest" will no longer be used.
> If
> > you were to use "latest" instead, if a consumer is started that does not
> > have a valid offset, it would use the very latest message in the topic as
> > the starting offset for message consumption.
> >
> > Now, if you are using the same consumer group each time you run the
> > application (which it seems is true, as you have "test-group" hardwired
> in
> > your application.yml), but you do not tear down your local cluster and
> > clear out its state, you will indeed see the behaviour you describe.
> > Remember that Kafka is durable, and maintains the offsets when the
> > individual applications go away. So you are probably seeing this:
> >
> > 1) start application instance 1. It realizes it has no offset when it
> tries
> > to register as a consumer on the input topic, so it creates a new
> consumer
> > entry for "earliest" for your consumer group.
> > 2) send message "asd"
> > 3) application instance 1 receives "asd", processes it, and updates the
> > offset (offset head = 1)
> > 4) Terminate instance 1
> > 5) Start application instance 2. It detects correctly that consumer group
> > "test-group" is available and reads that offset as its starting point.
> > 6) send message "{}"
> > 7) application instance 2 receives "{}", processes it, and updates the
> > offset (offset head = 2)
> > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> > telling the Kafka cluster that it belongs to the same consumer group as
> > application 1.
> >
> > Hope this helps,
> >
> > Adam
> >
> >
> >
> >
> >
> > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <sh...@gmail.com> wrote:
> >
> > > TL;DR:
> > > my streams application skips uncommitted messages
> > >
> > > Hello,
> > > I'm using streams API via spring framework and experiencing a weird
> > > behavior which I would like to get an explanation to:
> > > First of all: The attached zip is my test project, I used kafka cli to
> > run
> > > a localhost broker and zookeeper
> > >
> > > what is happening is as follows:
> > > 1. I send an invalid message, such as "asd", and my consumer has a lag
> > and
> > > error message as expected
> > > 2. I send a valid message such as "{}", but instead of rereading the
> > first
> > > message as expected from an "earliest" configured application - my
> > > application reads the latest message, commits it and ignoring the one
> in
> > > error, thus i have no lag!
> > > 3. When I'm running my application when there are uncommitted messages
> -
> > > my application reads the FIRST not committed message, as if it IS an
> > > "earliest" configured application!
> > >
> > > In your documentation you assure "at least once" behavior, but
> according
> > > to section 2. it happens so my application does not receive those
> > messages
> > > not even once (as i said, those messages are uncommitted)
> > >
> > > My guess is that it has something to do with the stream's cache... I
> > would
> > > very like to have an explanation or even a solution
> > >
> > > I'm turning to you as a last resort, after long weeks of research and
> > > experiments
> > >
> > > Thanks alot
> > >
> >
>

Re: A question about kafka streams API

Posted by Yui Yoi <sh...@gmail.com>.
Hi Adam,
Thanks a lot for the rapid response, it did helped!

Let me though ask one more simple question: Can I make a stream application
stuck on an invalid message? and not consuming any further messages?

Thanks again

On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <ad...@gmail.com>
wrote:

> Hi Yui Yoi
>
> Preface: I am not familiar with the spring framework.
>
> "Earliest" when it comes to consuming from Kafka means, "Start reading from
> the first message in the topic, *if there is no offset stored for that
> consumer group*". It sounds like you are expecting it to re-read each
> message whenever a new message comes in. This is not going to happen, as
> there will be a committed offset and "earliest" will no longer be used. If
> you were to use "latest" instead, if a consumer is started that does not
> have a valid offset, it would use the very latest message in the topic as
> the starting offset for message consumption.
>
> Now, if you are using the same consumer group each time you run the
> application (which it seems is true, as you have "test-group" hardwired in
> your application.yml), but you do not tear down your local cluster and
> clear out its state, you will indeed see the behaviour you describe.
> Remember that Kafka is durable, and maintains the offsets when the
> individual applications go away. So you are probably seeing this:
>
> 1) start application instance 1. It realizes it has no offset when it tries
> to register as a consumer on the input topic, so it creates a new consumer
> entry for "earliest" for your consumer group.
> 2) send message "asd"
> 3) application instance 1 receives "asd", processes it, and updates the
> offset (offset head = 1)
> 4) Terminate instance 1
> 5) Start application instance 2. It detects correctly that consumer group
> "test-group" is available and reads that offset as its starting point.
> 6) send message "{}"
> 7) application instance 2 receives "{}", processes it, and updates the
> offset (offset head = 2)
> *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> telling the Kafka cluster that it belongs to the same consumer group as
> application 1.
>
> Hope this helps,
>
> Adam
>
>
>
>
>
> On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <sh...@gmail.com> wrote:
>
> > TL;DR:
> > my streams application skips uncommitted messages
> >
> > Hello,
> > I'm using streams API via spring framework and experiencing a weird
> > behavior which I would like to get an explanation to:
> > First of all: The attached zip is my test project, I used kafka cli to
> run
> > a localhost broker and zookeeper
> >
> > what is happening is as follows:
> > 1. I send an invalid message, such as "asd", and my consumer has a lag
> and
> > error message as expected
> > 2. I send a valid message such as "{}", but instead of rereading the
> first
> > message as expected from an "earliest" configured application - my
> > application reads the latest message, commits it and ignoring the one in
> > error, thus i have no lag!
> > 3. When I'm running my application when there are uncommitted messages -
> > my application reads the FIRST not committed message, as if it IS an
> > "earliest" configured application!
> >
> > In your documentation you assure "at least once" behavior, but according
> > to section 2. it happens so my application does not receive those
> messages
> > not even once (as i said, those messages are uncommitted)
> >
> > My guess is that it has something to do with the stream's cache... I
> would
> > very like to have an explanation or even a solution
> >
> > I'm turning to you as a last resort, after long weeks of research and
> > experiments
> >
> > Thanks alot
> >
>

Re: A question about kafka streams API

Posted by Adam Bellemare <ad...@gmail.com>.
Hi Yui Yoi

Preface: I am not familiar with the spring framework.

"Earliest" when it comes to consuming from Kafka means, "Start reading from
the first message in the topic, *if there is no offset stored for that
consumer group*". It sounds like you are expecting it to re-read each
message whenever a new message comes in. This is not going to happen, as
there will be a committed offset and "earliest" will no longer be used. If
you were to use "latest" instead, if a consumer is started that does not
have a valid offset, it would use the very latest message in the topic as
the starting offset for message consumption.

Now, if you are using the same consumer group each time you run the
application (which it seems is true, as you have "test-group" hardwired in
your application.yml), but you do not tear down your local cluster and
clear out its state, you will indeed see the behaviour you describe.
Remember that Kafka is durable, and maintains the offsets when the
individual applications go away. So you are probably seeing this:

1) start application instance 1. It realizes it has no offset when it tries
to register as a consumer on the input topic, so it creates a new consumer
entry for "earliest" for your consumer group.
2) send message "asd"
3) application instance 1 receives "asd", processes it, and updates the
offset (offset head = 1)
4) Terminate instance 1
5) Start application instance 2. It detects correctly that consumer group
"test-group" is available and reads that offset as its starting point.
6) send message "{}"
7) application instance 2 receives "{}", processes it, and updates the
offset (offset head = 2)
*NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
telling the Kafka cluster that it belongs to the same consumer group as
application 1.

Hope this helps,

Adam





On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <sh...@gmail.com> wrote:

> TL;DR:
> my streams application skips uncommitted messages
>
> Hello,
> I'm using streams API via spring framework and experiencing a weird
> behavior which I would like to get an explanation to:
> First of all: The attached zip is my test project, I used kafka cli to run
> a localhost broker and zookeeper
>
> what is happening is as follows:
> 1. I send an invalid message, such as "asd", and my consumer has a lag and
> error message as expected
> 2. I send a valid message such as "{}", but instead of rereading the first
> message as expected from an "earliest" configured application - my
> application reads the latest message, commits it and ignoring the one in
> error, thus i have no lag!
> 3. When I'm running my application when there are uncommitted messages -
> my application reads the FIRST not committed message, as if it IS an
> "earliest" configured application!
>
> In your documentation you assure "at least once" behavior, but according
> to section 2. it happens so my application does not receive those messages
> not even once (as i said, those messages are uncommitted)
>
> My guess is that it has something to do with the stream's cache... I would
> very like to have an explanation or even a solution
>
> I'm turning to you as a last resort, after long weeks of research and
> experiments
>
> Thanks alot
>