You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Nick Carenza <ni...@thecontrolgroup.com> on 2017/02/09 02:54:46 UTC

ConsumeKafka processor erroring when held up by full queue

Hey team, I have a ConsumeKafka_0_10 running which normally operates
without problems. I had a queue back up due to a downstream processor and I
started getting these bulletins.

01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
likely as we were able to commit the process session but received an
exception from Kafka while committing offsets.

01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
interacting with Kafka so will close the lease
org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured session.timeout.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

My max.poll.records is set to 10000 on my consumer and session.timeout.ms
is the default 10000 on the server.

Since there is no such thing as coincidences, I believe this has to do with
it not being able to push received messages to the downstream queue.

If my flow is backed up, I expect the ConsumKafka processor not to throw
errors but continue to heartbeat with the Kafka server and resume consuming
once it can commit to the downstream queue?

Might I have the server or consumer misconfigured to handle this scenario
or should the consumer not be throwing this error?

Thanks,
- Nick

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Koji Kawamura <ij...@gmail.com>.
Hi all,

I just submitted a PR 1527, to add retaining connection logic to
ConsumeKafka_0_10.
https://github.com/apache/nifi/pull/1527

Any feedback would be appreciated. Especially, Nick, if you can
confirm that this fix would work as you expected, it'd be great!

Thanks,
Koji

On Wed, Feb 22, 2017 at 8:47 AM, Koji Kawamura <ij...@gmail.com> wrote:
> Hi Nick,
>
> I understand that. I will continue adding more code to iron out implementation.
> Please let me know if you find anything by looking at the code. I'd
> like you to review the branch in detail once the PR is ready.
>
> Thanks!
> Koji
>
> On Tue, Feb 21, 2017 at 9:06 AM, Nick Carenza
> <ni...@thecontrolgroup.com> wrote:
>> Hey Koji, thanks for putting in the time.
>>
>> I have not had a chance to start working on this myself and I certainly
>> support any effort to resolve it. I'll take a look at your branch and play
>> around with it.
>>
>> Thanks!,
>> Nick
>>
>> On Mon, Feb 20, 2017 at 2:30 AM, Koji Kawamura <ij...@gmail.com>
>> wrote:
>>>
>>> Hi Nick, Joe, Bryan,
>>>
>>> I confirmed that this is easily reproducible and I got exactly the
>>> same stacktrace.
>>>
>>> Also I was curious about how Kafka consumer's pause/resume API works,
>>> so I've gone further and done a simple experiment to see if it helps
>>> in this situation to retain consumer connection.
>>>
>>> The experimentation code in my remote branch here.
>>>
>>> https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a29662ba9bf
>>>
>>> It seems working as expected. After back-pressure is engaged,
>>> ConsumeKafka is not triggered for a while, then when downstream
>>> recovers from back-pressure, ConsumeKafka resumes consuming messages
>>> without any loss or warning messages using the same consumer instance.
>>>
>>> Nick, have you already start working on fixing this?
>>> If you haven't, and the approach using pause/resume sounds reasonable,
>>> I am going to add more synchronization, start/stop processor handling,
>>> and make it work more properly with onTrigger so that it only try to
>>> retain when there hasn't been any polling activity for some period of
>>> time.
>>>
>>> How do you think?
>>> Any feedback would be appreciated. Thanks!
>>>
>>> Koji
>>>
>>> On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza
>>> <ni...@thecontrolgroup.com> wrote:
>>> > Thank you guys, I will look to see what I can do to contribute.
>>> >
>>> > On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <jo...@gmail.com> wrote:
>>> >>
>>> >> That said I think we can improve our handling of the consumer (kafka
>>> >> client) and session (nifi transactional logic) and solve the problem.
>>> >> It is related to our backpressure/consumer handling so we can fix
>>> >> that.
>>> >>
>>> >> Thanks
>>> >> Joe
>>> >>
>>> >> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bb...@gmail.com> wrote:
>>> >> > No data loss, but you may process the same message twice in NiFi.
>>> >> >
>>> >> > The ordering of operations is:
>>> >> >
>>> >> > 1) poll Kafka
>>> >> > 2) write received data to flow file
>>> >> > 3) commit NiFi session so data in flow file cannot be lost
>>> >> > 4) commit offsets to Kafka
>>> >> >
>>> >> > Doing it this way achieves at-least once processing which means you
>>> >> > can't ever lose data, but you can process data twice.
>>> >> >
>>> >> > If we committed the offsets before committing the flow file you would
>>> >> > never get duplicates, but you could lose a message if a crash
>>> >> > happened
>>> >> > between commit the offset and committing the NiFi session (at-most
>>> >> > once processing).
>>> >> >
>>> >> > So the error is happening on #4 and NiFi has already produced a flow
>>> >> > file with the message, but then Kafka says it can't update the
>>> >> > offset,
>>> >> > and then another consumer will likely pull that same message again
>>> >> > and
>>> >> > produce another flow file with the same message.
>>> >> >
>>> >> >
>>> >> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
>>> >> > <ni...@thecontrolgroup.com> wrote:
>>> >> >> That makes perfect sense. To be clear, is there any potential to
>>> >> >> lose
>>> >> >> messages in this scenario?
>>> >> >>
>>> >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
>>> >> >>>
>>> >> >>> yeah this is probably a good case/cause for use of the pause
>>> >> >>> concept
>>> >> >>> in kafka consumers.
>>> >> >>>
>>> >> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com>
>>> >> >>> wrote:
>>> >> >>> > I believe you are running into this issue:
>>> >> >>> >
>>> >> >>> > https://issues.apache.org/jira/browse/NIFI-3189
>>> >> >>> >
>>> >> >>> > When back-pressure happens on the queue coming out of
>>> >> >>> > ConsumeKafka,
>>> >> >>> > this can last for longer than session.timeout.ms, and when the
>>> >> >>> > processors resumes executing it receives this error on the first
>>> >> >>> > execution. We should be able to implement some type of keep-alive
>>> >> >>> > so
>>> >> >>> > that even when the processor is not executing, there is a
>>> >> >>> > background
>>> >> >>> > thread, or some way of keeping the connections alive.
>>> >> >>> >
>>> >> >>> > I believe any user-defined properties in the processor get passed
>>> >> >>> > to
>>> >> >>> > the Kafka consumer, so I believe you could add
>>> >> >>> > "session.timeout.ms"
>>> >> >>> > and set a much higher value as a possible work around.
>>> >> >>> >
>>> >> >>> > Thanks,
>>> >> >>> >
>>> >> >>> > Bryan
>>> >> >>> >
>>> >> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura
>>> >> >>> > <ij...@gmail.com>
>>> >> >>> > wrote:
>>> >> >>> >> Hello Nick,
>>> >> >>> >>
>>> >> >>> >> First, I assume "had a queue back up" means have a queue being
>>> >> >>> >> back-pressure. Sorry if that was different meaning.
>>> >> >>> >>
>>> >> >>> >> I was trying to reproduce by following flow:
>>> >> >>> >> ConsumeKafka_0_10
>>> >> >>> >>   -- success: Back Pressure Object Threshold = 10
>>> >> >>> >>     -- UpdateAttribute (Stopped)
>>> >> >>> >>
>>> >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>>> >> >>> >> The result was, when NiFi received 10th messages, the success
>>> >> >>> >> relationship back-pressure was enabled.
>>> >> >>> >> When I published the 11th message, NiFi didn't do anything.
>>> >> >>> >> This is expected behavior because downstream connection is
>>> >> >>> >> back-pressured, the processor won't be scheduled.
>>> >> >>> >>
>>> >> >>> >> After I started UpdateAttribute and the queued flow files went
>>> >> >>> >> through, ConsumeKafka was executed again and received the 11th
>>> >> >>> >> message.
>>> >> >>> >>
>>> >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source
>>> >> >>> >> code,
>>> >> >>> >> those warning and error message is logged because NiFi received
>>> >> >>> >> KafkaException when it tried to commit offset to Kafka.
>>> >> >>> >>
>>> >> >>> >> Were there anything in Kafka server logs? I suspect something
>>> >> >>> >> had
>>> >> >>> >> happened at Kafka server side.
>>> >> >>> >>
>>> >> >>> >> Thanks,
>>> >> >>> >> Koji
>>> >> >>> >>
>>> >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>>> >> >>> >> <ni...@thecontrolgroup.com> wrote:
>>> >> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
>>> >> >>> >>> operates
>>> >> >>> >>> without
>>> >> >>> >>> problems. I had a queue back up due to a downstream processor
>>> >> >>> >>> and
>>> >> >>> >>> I
>>> >> >>> >>> started
>>> >> >>> >>> getting these bulletins.
>>> >> >>> >>>
>>> >> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>>> >> >>> >>> Duplicates
>>> >> >>> >>> are
>>> >> >>> >>> likely as we were able to commit the process session but
>>> >> >>> >>> received
>>> >> >>> >>> an
>>> >> >>> >>> exception from Kafka while committing offsets.
>>> >> >>> >>>
>>> >> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>>> >> >>> >>> Exception
>>> >> >>> >>> while
>>> >> >>> >>> interacting with Kafka so will close the lease
>>> >> >>> >>>
>>> >> >>> >>>
>>> >> >>> >>>
>>> >> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>>> >> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
>>> >> >>> >>> Commit
>>> >> >>> >>> cannot be completed since the group has already rebalanced and
>>> >> >>> >>> assigned the
>>> >> >>> >>> partitions to another member. This means that the time between
>>> >> >>> >>> subsequent
>>> >> >>> >>> calls to poll() was longer than the configured
>>> >> >>> >>> session.timeout.ms,
>>> >> >>> >>> which
>>> >> >>> >>> typically implies that the poll loop is spending too much time
>>> >> >>> >>> message
>>> >> >>> >>> processing. You can address this either by increasing the
>>> >> >>> >>> session
>>> >> >>> >>> timeout or
>>> >> >>> >>> by reducing the maximum size of batches returned in poll() with
>>> >> >>> >>> max.poll.records.
>>> >> >>> >>>
>>> >> >>> >>> My max.poll.records is set to 10000 on my consumer and
>>> >> >>> >>> session.timeout.ms is
>>> >> >>> >>> the default 10000 on the server.
>>> >> >>> >>>
>>> >> >>> >>> Since there is no such thing as coincidences, I believe this
>>> >> >>> >>> has
>>> >> >>> >>> to do
>>> >> >>> >>> with
>>> >> >>> >>> it not being able to push received messages to the downstream
>>> >> >>> >>> queue.
>>> >> >>> >>>
>>> >> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not
>>> >> >>> >>> to
>>> >> >>> >>> throw
>>> >> >>> >>> errors but continue to heartbeat with the Kafka server and
>>> >> >>> >>> resume
>>> >> >>> >>> consuming
>>> >> >>> >>> once it can commit to the downstream queue?
>>> >> >>> >>>
>>> >> >>> >>> Might I have the server or consumer misconfigured to handle
>>> >> >>> >>> this
>>> >> >>> >>> scenario or
>>> >> >>> >>> should the consumer not be throwing this error?
>>> >> >>> >>>
>>> >> >>> >>> Thanks,
>>> >> >>> >>> - Nick
>>> >> >>
>>> >> >>
>>> >
>>> >
>>
>>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Nick,

I understand that. I will continue adding more code to iron out implementation.
Please let me know if you find anything by looking at the code. I'd
like you to review the branch in detail once the PR is ready.

Thanks!
Koji

On Tue, Feb 21, 2017 at 9:06 AM, Nick Carenza
<ni...@thecontrolgroup.com> wrote:
> Hey Koji, thanks for putting in the time.
>
> I have not had a chance to start working on this myself and I certainly
> support any effort to resolve it. I'll take a look at your branch and play
> around with it.
>
> Thanks!,
> Nick
>
> On Mon, Feb 20, 2017 at 2:30 AM, Koji Kawamura <ij...@gmail.com>
> wrote:
>>
>> Hi Nick, Joe, Bryan,
>>
>> I confirmed that this is easily reproducible and I got exactly the
>> same stacktrace.
>>
>> Also I was curious about how Kafka consumer's pause/resume API works,
>> so I've gone further and done a simple experiment to see if it helps
>> in this situation to retain consumer connection.
>>
>> The experimentation code in my remote branch here.
>>
>> https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a29662ba9bf
>>
>> It seems working as expected. After back-pressure is engaged,
>> ConsumeKafka is not triggered for a while, then when downstream
>> recovers from back-pressure, ConsumeKafka resumes consuming messages
>> without any loss or warning messages using the same consumer instance.
>>
>> Nick, have you already start working on fixing this?
>> If you haven't, and the approach using pause/resume sounds reasonable,
>> I am going to add more synchronization, start/stop processor handling,
>> and make it work more properly with onTrigger so that it only try to
>> retain when there hasn't been any polling activity for some period of
>> time.
>>
>> How do you think?
>> Any feedback would be appreciated. Thanks!
>>
>> Koji
>>
>> On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza
>> <ni...@thecontrolgroup.com> wrote:
>> > Thank you guys, I will look to see what I can do to contribute.
>> >
>> > On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <jo...@gmail.com> wrote:
>> >>
>> >> That said I think we can improve our handling of the consumer (kafka
>> >> client) and session (nifi transactional logic) and solve the problem.
>> >> It is related to our backpressure/consumer handling so we can fix
>> >> that.
>> >>
>> >> Thanks
>> >> Joe
>> >>
>> >> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bb...@gmail.com> wrote:
>> >> > No data loss, but you may process the same message twice in NiFi.
>> >> >
>> >> > The ordering of operations is:
>> >> >
>> >> > 1) poll Kafka
>> >> > 2) write received data to flow file
>> >> > 3) commit NiFi session so data in flow file cannot be lost
>> >> > 4) commit offsets to Kafka
>> >> >
>> >> > Doing it this way achieves at-least once processing which means you
>> >> > can't ever lose data, but you can process data twice.
>> >> >
>> >> > If we committed the offsets before committing the flow file you would
>> >> > never get duplicates, but you could lose a message if a crash
>> >> > happened
>> >> > between commit the offset and committing the NiFi session (at-most
>> >> > once processing).
>> >> >
>> >> > So the error is happening on #4 and NiFi has already produced a flow
>> >> > file with the message, but then Kafka says it can't update the
>> >> > offset,
>> >> > and then another consumer will likely pull that same message again
>> >> > and
>> >> > produce another flow file with the same message.
>> >> >
>> >> >
>> >> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
>> >> > <ni...@thecontrolgroup.com> wrote:
>> >> >> That makes perfect sense. To be clear, is there any potential to
>> >> >> lose
>> >> >> messages in this scenario?
>> >> >>
>> >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
>> >> >>>
>> >> >>> yeah this is probably a good case/cause for use of the pause
>> >> >>> concept
>> >> >>> in kafka consumers.
>> >> >>>
>> >> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com>
>> >> >>> wrote:
>> >> >>> > I believe you are running into this issue:
>> >> >>> >
>> >> >>> > https://issues.apache.org/jira/browse/NIFI-3189
>> >> >>> >
>> >> >>> > When back-pressure happens on the queue coming out of
>> >> >>> > ConsumeKafka,
>> >> >>> > this can last for longer than session.timeout.ms, and when the
>> >> >>> > processors resumes executing it receives this error on the first
>> >> >>> > execution. We should be able to implement some type of keep-alive
>> >> >>> > so
>> >> >>> > that even when the processor is not executing, there is a
>> >> >>> > background
>> >> >>> > thread, or some way of keeping the connections alive.
>> >> >>> >
>> >> >>> > I believe any user-defined properties in the processor get passed
>> >> >>> > to
>> >> >>> > the Kafka consumer, so I believe you could add
>> >> >>> > "session.timeout.ms"
>> >> >>> > and set a much higher value as a possible work around.
>> >> >>> >
>> >> >>> > Thanks,
>> >> >>> >
>> >> >>> > Bryan
>> >> >>> >
>> >> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura
>> >> >>> > <ij...@gmail.com>
>> >> >>> > wrote:
>> >> >>> >> Hello Nick,
>> >> >>> >>
>> >> >>> >> First, I assume "had a queue back up" means have a queue being
>> >> >>> >> back-pressure. Sorry if that was different meaning.
>> >> >>> >>
>> >> >>> >> I was trying to reproduce by following flow:
>> >> >>> >> ConsumeKafka_0_10
>> >> >>> >>   -- success: Back Pressure Object Threshold = 10
>> >> >>> >>     -- UpdateAttribute (Stopped)
>> >> >>> >>
>> >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>> >> >>> >> The result was, when NiFi received 10th messages, the success
>> >> >>> >> relationship back-pressure was enabled.
>> >> >>> >> When I published the 11th message, NiFi didn't do anything.
>> >> >>> >> This is expected behavior because downstream connection is
>> >> >>> >> back-pressured, the processor won't be scheduled.
>> >> >>> >>
>> >> >>> >> After I started UpdateAttribute and the queued flow files went
>> >> >>> >> through, ConsumeKafka was executed again and received the 11th
>> >> >>> >> message.
>> >> >>> >>
>> >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source
>> >> >>> >> code,
>> >> >>> >> those warning and error message is logged because NiFi received
>> >> >>> >> KafkaException when it tried to commit offset to Kafka.
>> >> >>> >>
>> >> >>> >> Were there anything in Kafka server logs? I suspect something
>> >> >>> >> had
>> >> >>> >> happened at Kafka server side.
>> >> >>> >>
>> >> >>> >> Thanks,
>> >> >>> >> Koji
>> >> >>> >>
>> >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>> >> >>> >> <ni...@thecontrolgroup.com> wrote:
>> >> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
>> >> >>> >>> operates
>> >> >>> >>> without
>> >> >>> >>> problems. I had a queue back up due to a downstream processor
>> >> >>> >>> and
>> >> >>> >>> I
>> >> >>> >>> started
>> >> >>> >>> getting these bulletins.
>> >> >>> >>>
>> >> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>> >> >>> >>> Duplicates
>> >> >>> >>> are
>> >> >>> >>> likely as we were able to commit the process session but
>> >> >>> >>> received
>> >> >>> >>> an
>> >> >>> >>> exception from Kafka while committing offsets.
>> >> >>> >>>
>> >> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>> >> >>> >>> Exception
>> >> >>> >>> while
>> >> >>> >>> interacting with Kafka so will close the lease
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>> >> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
>> >> >>> >>> Commit
>> >> >>> >>> cannot be completed since the group has already rebalanced and
>> >> >>> >>> assigned the
>> >> >>> >>> partitions to another member. This means that the time between
>> >> >>> >>> subsequent
>> >> >>> >>> calls to poll() was longer than the configured
>> >> >>> >>> session.timeout.ms,
>> >> >>> >>> which
>> >> >>> >>> typically implies that the poll loop is spending too much time
>> >> >>> >>> message
>> >> >>> >>> processing. You can address this either by increasing the
>> >> >>> >>> session
>> >> >>> >>> timeout or
>> >> >>> >>> by reducing the maximum size of batches returned in poll() with
>> >> >>> >>> max.poll.records.
>> >> >>> >>>
>> >> >>> >>> My max.poll.records is set to 10000 on my consumer and
>> >> >>> >>> session.timeout.ms is
>> >> >>> >>> the default 10000 on the server.
>> >> >>> >>>
>> >> >>> >>> Since there is no such thing as coincidences, I believe this
>> >> >>> >>> has
>> >> >>> >>> to do
>> >> >>> >>> with
>> >> >>> >>> it not being able to push received messages to the downstream
>> >> >>> >>> queue.
>> >> >>> >>>
>> >> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not
>> >> >>> >>> to
>> >> >>> >>> throw
>> >> >>> >>> errors but continue to heartbeat with the Kafka server and
>> >> >>> >>> resume
>> >> >>> >>> consuming
>> >> >>> >>> once it can commit to the downstream queue?
>> >> >>> >>>
>> >> >>> >>> Might I have the server or consumer misconfigured to handle
>> >> >>> >>> this
>> >> >>> >>> scenario or
>> >> >>> >>> should the consumer not be throwing this error?
>> >> >>> >>>
>> >> >>> >>> Thanks,
>> >> >>> >>> - Nick
>> >> >>
>> >> >>
>> >
>> >
>
>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Nick Carenza <ni...@thecontrolgroup.com>.
Hey Koji, thanks for putting in the time.

I have not had a chance to start working on this myself and I certainly
support any effort to resolve it. I'll take a look at your branch and play
around with it.

Thanks!,
Nick

On Mon, Feb 20, 2017 at 2:30 AM, Koji Kawamura <ij...@gmail.com>
wrote:

> Hi Nick, Joe, Bryan,
>
> I confirmed that this is easily reproducible and I got exactly the
> same stacktrace.
>
> Also I was curious about how Kafka consumer's pause/resume API works,
> so I've gone further and done a simple experiment to see if it helps
> in this situation to retain consumer connection.
>
> The experimentation code in my remote branch here.
> https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a
> 29662ba9bf
>
> It seems working as expected. After back-pressure is engaged,
> ConsumeKafka is not triggered for a while, then when downstream
> recovers from back-pressure, ConsumeKafka resumes consuming messages
> without any loss or warning messages using the same consumer instance.
>
> Nick, have you already start working on fixing this?
> If you haven't, and the approach using pause/resume sounds reasonable,
> I am going to add more synchronization, start/stop processor handling,
> and make it work more properly with onTrigger so that it only try to
> retain when there hasn't been any polling activity for some period of
> time.
>
> How do you think?
> Any feedback would be appreciated. Thanks!
>
> Koji
>
> On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza
> <ni...@thecontrolgroup.com> wrote:
> > Thank you guys, I will look to see what I can do to contribute.
> >
> > On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <jo...@gmail.com> wrote:
> >>
> >> That said I think we can improve our handling of the consumer (kafka
> >> client) and session (nifi transactional logic) and solve the problem.
> >> It is related to our backpressure/consumer handling so we can fix
> >> that.
> >>
> >> Thanks
> >> Joe
> >>
> >> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bb...@gmail.com> wrote:
> >> > No data loss, but you may process the same message twice in NiFi.
> >> >
> >> > The ordering of operations is:
> >> >
> >> > 1) poll Kafka
> >> > 2) write received data to flow file
> >> > 3) commit NiFi session so data in flow file cannot be lost
> >> > 4) commit offsets to Kafka
> >> >
> >> > Doing it this way achieves at-least once processing which means you
> >> > can't ever lose data, but you can process data twice.
> >> >
> >> > If we committed the offsets before committing the flow file you would
> >> > never get duplicates, but you could lose a message if a crash happened
> >> > between commit the offset and committing the NiFi session (at-most
> >> > once processing).
> >> >
> >> > So the error is happening on #4 and NiFi has already produced a flow
> >> > file with the message, but then Kafka says it can't update the offset,
> >> > and then another consumer will likely pull that same message again and
> >> > produce another flow file with the same message.
> >> >
> >> >
> >> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
> >> > <ni...@thecontrolgroup.com> wrote:
> >> >> That makes perfect sense. To be clear, is there any potential to lose
> >> >> messages in this scenario?
> >> >>
> >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
> >> >>>
> >> >>> yeah this is probably a good case/cause for use of the pause concept
> >> >>> in kafka consumers.
> >> >>>
> >> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com>
> wrote:
> >> >>> > I believe you are running into this issue:
> >> >>> >
> >> >>> > https://issues.apache.org/jira/browse/NIFI-3189
> >> >>> >
> >> >>> > When back-pressure happens on the queue coming out of
> ConsumeKafka,
> >> >>> > this can last for longer than session.timeout.ms, and when the
> >> >>> > processors resumes executing it receives this error on the first
> >> >>> > execution. We should be able to implement some type of keep-alive
> so
> >> >>> > that even when the processor is not executing, there is a
> background
> >> >>> > thread, or some way of keeping the connections alive.
> >> >>> >
> >> >>> > I believe any user-defined properties in the processor get passed
> to
> >> >>> > the Kafka consumer, so I believe you could add "
> session.timeout.ms"
> >> >>> > and set a much higher value as a possible work around.
> >> >>> >
> >> >>> > Thanks,
> >> >>> >
> >> >>> > Bryan
> >> >>> >
> >> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura
> >> >>> > <ij...@gmail.com>
> >> >>> > wrote:
> >> >>> >> Hello Nick,
> >> >>> >>
> >> >>> >> First, I assume "had a queue back up" means have a queue being
> >> >>> >> back-pressure. Sorry if that was different meaning.
> >> >>> >>
> >> >>> >> I was trying to reproduce by following flow:
> >> >>> >> ConsumeKafka_0_10
> >> >>> >>   -- success: Back Pressure Object Threshold = 10
> >> >>> >>     -- UpdateAttribute (Stopped)
> >> >>> >>
> >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
> >> >>> >> The result was, when NiFi received 10th messages, the success
> >> >>> >> relationship back-pressure was enabled.
> >> >>> >> When I published the 11th message, NiFi didn't do anything.
> >> >>> >> This is expected behavior because downstream connection is
> >> >>> >> back-pressured, the processor won't be scheduled.
> >> >>> >>
> >> >>> >> After I started UpdateAttribute and the queued flow files went
> >> >>> >> through, ConsumeKafka was executed again and received the 11th
> >> >>> >> message.
> >> >>> >>
> >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source
> >> >>> >> code,
> >> >>> >> those warning and error message is logged because NiFi received
> >> >>> >> KafkaException when it tried to commit offset to Kafka.
> >> >>> >>
> >> >>> >> Were there anything in Kafka server logs? I suspect something had
> >> >>> >> happened at Kafka server side.
> >> >>> >>
> >> >>> >> Thanks,
> >> >>> >> Koji
> >> >>> >>
> >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> >> >>> >> <ni...@thecontrolgroup.com> wrote:
> >> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
> >> >>> >>> operates
> >> >>> >>> without
> >> >>> >>> problems. I had a queue back up due to a downstream processor
> and
> >> >>> >>> I
> >> >>> >>> started
> >> >>> >>> getting these bulletins.
> >> >>> >>>
> >> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
> >> >>> >>> Duplicates
> >> >>> >>> are
> >> >>> >>> likely as we were able to commit the process session but
> received
> >> >>> >>> an
> >> >>> >>> exception from Kafka while committing offsets.
> >> >>> >>>
> >> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
> >> >>> >>> Exception
> >> >>> >>> while
> >> >>> >>> interacting with Kafka so will close the lease
> >> >>> >>>
> >> >>> >>>
> >> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$
> SimpleConsumerLease@87d2ac1
> >> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
> >> >>> >>> Commit
> >> >>> >>> cannot be completed since the group has already rebalanced and
> >> >>> >>> assigned the
> >> >>> >>> partitions to another member. This means that the time between
> >> >>> >>> subsequent
> >> >>> >>> calls to poll() was longer than the configured
> session.timeout.ms,
> >> >>> >>> which
> >> >>> >>> typically implies that the poll loop is spending too much time
> >> >>> >>> message
> >> >>> >>> processing. You can address this either by increasing the
> session
> >> >>> >>> timeout or
> >> >>> >>> by reducing the maximum size of batches returned in poll() with
> >> >>> >>> max.poll.records.
> >> >>> >>>
> >> >>> >>> My max.poll.records is set to 10000 on my consumer and
> >> >>> >>> session.timeout.ms is
> >> >>> >>> the default 10000 on the server.
> >> >>> >>>
> >> >>> >>> Since there is no such thing as coincidences, I believe this has
> >> >>> >>> to do
> >> >>> >>> with
> >> >>> >>> it not being able to push received messages to the downstream
> >> >>> >>> queue.
> >> >>> >>>
> >> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not
> to
> >> >>> >>> throw
> >> >>> >>> errors but continue to heartbeat with the Kafka server and
> resume
> >> >>> >>> consuming
> >> >>> >>> once it can commit to the downstream queue?
> >> >>> >>>
> >> >>> >>> Might I have the server or consumer misconfigured to handle this
> >> >>> >>> scenario or
> >> >>> >>> should the consumer not be throwing this error?
> >> >>> >>>
> >> >>> >>> Thanks,
> >> >>> >>> - Nick
> >> >>
> >> >>
> >
> >
>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Nick, Joe, Bryan,

I confirmed that this is easily reproducible and I got exactly the
same stacktrace.

Also I was curious about how Kafka consumer's pause/resume API works,
so I've gone further and done a simple experiment to see if it helps
in this situation to retain consumer connection.

The experimentation code in my remote branch here.
https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a29662ba9bf

It seems working as expected. After back-pressure is engaged,
ConsumeKafka is not triggered for a while, then when downstream
recovers from back-pressure, ConsumeKafka resumes consuming messages
without any loss or warning messages using the same consumer instance.

Nick, have you already start working on fixing this?
If you haven't, and the approach using pause/resume sounds reasonable,
I am going to add more synchronization, start/stop processor handling,
and make it work more properly with onTrigger so that it only try to
retain when there hasn't been any polling activity for some period of
time.

How do you think?
Any feedback would be appreciated. Thanks!

Koji

On Fri, Feb 10, 2017 at 7:22 AM, Nick Carenza
<ni...@thecontrolgroup.com> wrote:
> Thank you guys, I will look to see what I can do to contribute.
>
> On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <jo...@gmail.com> wrote:
>>
>> That said I think we can improve our handling of the consumer (kafka
>> client) and session (nifi transactional logic) and solve the problem.
>> It is related to our backpressure/consumer handling so we can fix
>> that.
>>
>> Thanks
>> Joe
>>
>> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bb...@gmail.com> wrote:
>> > No data loss, but you may process the same message twice in NiFi.
>> >
>> > The ordering of operations is:
>> >
>> > 1) poll Kafka
>> > 2) write received data to flow file
>> > 3) commit NiFi session so data in flow file cannot be lost
>> > 4) commit offsets to Kafka
>> >
>> > Doing it this way achieves at-least once processing which means you
>> > can't ever lose data, but you can process data twice.
>> >
>> > If we committed the offsets before committing the flow file you would
>> > never get duplicates, but you could lose a message if a crash happened
>> > between commit the offset and committing the NiFi session (at-most
>> > once processing).
>> >
>> > So the error is happening on #4 and NiFi has already produced a flow
>> > file with the message, but then Kafka says it can't update the offset,
>> > and then another consumer will likely pull that same message again and
>> > produce another flow file with the same message.
>> >
>> >
>> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
>> > <ni...@thecontrolgroup.com> wrote:
>> >> That makes perfect sense. To be clear, is there any potential to lose
>> >> messages in this scenario?
>> >>
>> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
>> >>>
>> >>> yeah this is probably a good case/cause for use of the pause concept
>> >>> in kafka consumers.
>> >>>
>> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com> wrote:
>> >>> > I believe you are running into this issue:
>> >>> >
>> >>> > https://issues.apache.org/jira/browse/NIFI-3189
>> >>> >
>> >>> > When back-pressure happens on the queue coming out of ConsumeKafka,
>> >>> > this can last for longer than session.timeout.ms, and when the
>> >>> > processors resumes executing it receives this error on the first
>> >>> > execution. We should be able to implement some type of keep-alive so
>> >>> > that even when the processor is not executing, there is a background
>> >>> > thread, or some way of keeping the connections alive.
>> >>> >
>> >>> > I believe any user-defined properties in the processor get passed to
>> >>> > the Kafka consumer, so I believe you could add "session.timeout.ms"
>> >>> > and set a much higher value as a possible work around.
>> >>> >
>> >>> > Thanks,
>> >>> >
>> >>> > Bryan
>> >>> >
>> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura
>> >>> > <ij...@gmail.com>
>> >>> > wrote:
>> >>> >> Hello Nick,
>> >>> >>
>> >>> >> First, I assume "had a queue back up" means have a queue being
>> >>> >> back-pressure. Sorry if that was different meaning.
>> >>> >>
>> >>> >> I was trying to reproduce by following flow:
>> >>> >> ConsumeKafka_0_10
>> >>> >>   -- success: Back Pressure Object Threshold = 10
>> >>> >>     -- UpdateAttribute (Stopped)
>> >>> >>
>> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>> >>> >> The result was, when NiFi received 10th messages, the success
>> >>> >> relationship back-pressure was enabled.
>> >>> >> When I published the 11th message, NiFi didn't do anything.
>> >>> >> This is expected behavior because downstream connection is
>> >>> >> back-pressured, the processor won't be scheduled.
>> >>> >>
>> >>> >> After I started UpdateAttribute and the queued flow files went
>> >>> >> through, ConsumeKafka was executed again and received the 11th
>> >>> >> message.
>> >>> >>
>> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source
>> >>> >> code,
>> >>> >> those warning and error message is logged because NiFi received
>> >>> >> KafkaException when it tried to commit offset to Kafka.
>> >>> >>
>> >>> >> Were there anything in Kafka server logs? I suspect something had
>> >>> >> happened at Kafka server side.
>> >>> >>
>> >>> >> Thanks,
>> >>> >> Koji
>> >>> >>
>> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>> >>> >> <ni...@thecontrolgroup.com> wrote:
>> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
>> >>> >>> operates
>> >>> >>> without
>> >>> >>> problems. I had a queue back up due to a downstream processor and
>> >>> >>> I
>> >>> >>> started
>> >>> >>> getting these bulletins.
>> >>> >>>
>> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>> >>> >>> Duplicates
>> >>> >>> are
>> >>> >>> likely as we were able to commit the process session but received
>> >>> >>> an
>> >>> >>> exception from Kafka while committing offsets.
>> >>> >>>
>> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
>> >>> >>> Exception
>> >>> >>> while
>> >>> >>> interacting with Kafka so will close the lease
>> >>> >>>
>> >>> >>>
>> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
>> >>> >>> Commit
>> >>> >>> cannot be completed since the group has already rebalanced and
>> >>> >>> assigned the
>> >>> >>> partitions to another member. This means that the time between
>> >>> >>> subsequent
>> >>> >>> calls to poll() was longer than the configured session.timeout.ms,
>> >>> >>> which
>> >>> >>> typically implies that the poll loop is spending too much time
>> >>> >>> message
>> >>> >>> processing. You can address this either by increasing the session
>> >>> >>> timeout or
>> >>> >>> by reducing the maximum size of batches returned in poll() with
>> >>> >>> max.poll.records.
>> >>> >>>
>> >>> >>> My max.poll.records is set to 10000 on my consumer and
>> >>> >>> session.timeout.ms is
>> >>> >>> the default 10000 on the server.
>> >>> >>>
>> >>> >>> Since there is no such thing as coincidences, I believe this has
>> >>> >>> to do
>> >>> >>> with
>> >>> >>> it not being able to push received messages to the downstream
>> >>> >>> queue.
>> >>> >>>
>> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not to
>> >>> >>> throw
>> >>> >>> errors but continue to heartbeat with the Kafka server and resume
>> >>> >>> consuming
>> >>> >>> once it can commit to the downstream queue?
>> >>> >>>
>> >>> >>> Might I have the server or consumer misconfigured to handle this
>> >>> >>> scenario or
>> >>> >>> should the consumer not be throwing this error?
>> >>> >>>
>> >>> >>> Thanks,
>> >>> >>> - Nick
>> >>
>> >>
>
>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Nick Carenza <ni...@thecontrolgroup.com>.
Thank you guys, I will look to see what I can do to contribute.

On Thu, Feb 9, 2017 at 1:19 PM, Joe Witt <jo...@gmail.com> wrote:

> That said I think we can improve our handling of the consumer (kafka
> client) and session (nifi transactional logic) and solve the problem.
> It is related to our backpressure/consumer handling so we can fix
> that.
>
> Thanks
> Joe
>
> On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bb...@gmail.com> wrote:
> > No data loss, but you may process the same message twice in NiFi.
> >
> > The ordering of operations is:
> >
> > 1) poll Kafka
> > 2) write received data to flow file
> > 3) commit NiFi session so data in flow file cannot be lost
> > 4) commit offsets to Kafka
> >
> > Doing it this way achieves at-least once processing which means you
> > can't ever lose data, but you can process data twice.
> >
> > If we committed the offsets before committing the flow file you would
> > never get duplicates, but you could lose a message if a crash happened
> > between commit the offset and committing the NiFi session (at-most
> > once processing).
> >
> > So the error is happening on #4 and NiFi has already produced a flow
> > file with the message, but then Kafka says it can't update the offset,
> > and then another consumer will likely pull that same message again and
> > produce another flow file with the same message.
> >
> >
> > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
> > <ni...@thecontrolgroup.com> wrote:
> >> That makes perfect sense. To be clear, is there any potential to lose
> >> messages in this scenario?
> >>
> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
> >>>
> >>> yeah this is probably a good case/cause for use of the pause concept
> >>> in kafka consumers.
> >>>
> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com> wrote:
> >>> > I believe you are running into this issue:
> >>> >
> >>> > https://issues.apache.org/jira/browse/NIFI-3189
> >>> >
> >>> > When back-pressure happens on the queue coming out of ConsumeKafka,
> >>> > this can last for longer than session.timeout.ms, and when the
> >>> > processors resumes executing it receives this error on the first
> >>> > execution. We should be able to implement some type of keep-alive so
> >>> > that even when the processor is not executing, there is a background
> >>> > thread, or some way of keeping the connections alive.
> >>> >
> >>> > I believe any user-defined properties in the processor get passed to
> >>> > the Kafka consumer, so I believe you could add "session.timeout.ms"
> >>> > and set a much higher value as a possible work around.
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Bryan
> >>> >
> >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <
> ijokarumawak@gmail.com>
> >>> > wrote:
> >>> >> Hello Nick,
> >>> >>
> >>> >> First, I assume "had a queue back up" means have a queue being
> >>> >> back-pressure. Sorry if that was different meaning.
> >>> >>
> >>> >> I was trying to reproduce by following flow:
> >>> >> ConsumeKafka_0_10
> >>> >>   -- success: Back Pressure Object Threshold = 10
> >>> >>     -- UpdateAttribute (Stopped)
> >>> >>
> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
> >>> >> The result was, when NiFi received 10th messages, the success
> >>> >> relationship back-pressure was enabled.
> >>> >> When I published the 11th message, NiFi didn't do anything.
> >>> >> This is expected behavior because downstream connection is
> >>> >> back-pressured, the processor won't be scheduled.
> >>> >>
> >>> >> After I started UpdateAttribute and the queued flow files went
> >>> >> through, ConsumeKafka was executed again and received the 11th
> >>> >> message.
> >>> >>
> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
> >>> >> those warning and error message is logged because NiFi received
> >>> >> KafkaException when it tried to commit offset to Kafka.
> >>> >>
> >>> >> Were there anything in Kafka server logs? I suspect something had
> >>> >> happened at Kafka server side.
> >>> >>
> >>> >> Thanks,
> >>> >> Koji
> >>> >>
> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> >>> >> <ni...@thecontrolgroup.com> wrote:
> >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally
> operates
> >>> >>> without
> >>> >>> problems. I had a queue back up due to a downstream processor and I
> >>> >>> started
> >>> >>> getting these bulletins.
> >>> >>>
> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
> Duplicates
> >>> >>> are
> >>> >>> likely as we were able to commit the process session but received
> an
> >>> >>> exception from Kafka while committing offsets.
> >>> >>>
> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2]
> Exception
> >>> >>> while
> >>> >>> interacting with Kafka so will close the lease
> >>> >>>
> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$
> SimpleConsumerLease@87d2ac1
> >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException:
> Commit
> >>> >>> cannot be completed since the group has already rebalanced and
> >>> >>> assigned the
> >>> >>> partitions to another member. This means that the time between
> >>> >>> subsequent
> >>> >>> calls to poll() was longer than the configured session.timeout.ms,
> >>> >>> which
> >>> >>> typically implies that the poll loop is spending too much time
> message
> >>> >>> processing. You can address this either by increasing the session
> >>> >>> timeout or
> >>> >>> by reducing the maximum size of batches returned in poll() with
> >>> >>> max.poll.records.
> >>> >>>
> >>> >>> My max.poll.records is set to 10000 on my consumer and
> >>> >>> session.timeout.ms is
> >>> >>> the default 10000 on the server.
> >>> >>>
> >>> >>> Since there is no such thing as coincidences, I believe this has
> to do
> >>> >>> with
> >>> >>> it not being able to push received messages to the downstream
> queue.
> >>> >>>
> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not to
> >>> >>> throw
> >>> >>> errors but continue to heartbeat with the Kafka server and resume
> >>> >>> consuming
> >>> >>> once it can commit to the downstream queue?
> >>> >>>
> >>> >>> Might I have the server or consumer misconfigured to handle this
> >>> >>> scenario or
> >>> >>> should the consumer not be throwing this error?
> >>> >>>
> >>> >>> Thanks,
> >>> >>> - Nick
> >>
> >>
>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Joe Witt <jo...@gmail.com>.
That said I think we can improve our handling of the consumer (kafka
client) and session (nifi transactional logic) and solve the problem.
It is related to our backpressure/consumer handling so we can fix
that.

Thanks
Joe

On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <bb...@gmail.com> wrote:
> No data loss, but you may process the same message twice in NiFi.
>
> The ordering of operations is:
>
> 1) poll Kafka
> 2) write received data to flow file
> 3) commit NiFi session so data in flow file cannot be lost
> 4) commit offsets to Kafka
>
> Doing it this way achieves at-least once processing which means you
> can't ever lose data, but you can process data twice.
>
> If we committed the offsets before committing the flow file you would
> never get duplicates, but you could lose a message if a crash happened
> between commit the offset and committing the NiFi session (at-most
> once processing).
>
> So the error is happening on #4 and NiFi has already produced a flow
> file with the message, but then Kafka says it can't update the offset,
> and then another consumer will likely pull that same message again and
> produce another flow file with the same message.
>
>
> On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
> <ni...@thecontrolgroup.com> wrote:
>> That makes perfect sense. To be clear, is there any potential to lose
>> messages in this scenario?
>>
>> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
>>>
>>> yeah this is probably a good case/cause for use of the pause concept
>>> in kafka consumers.
>>>
>>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com> wrote:
>>> > I believe you are running into this issue:
>>> >
>>> > https://issues.apache.org/jira/browse/NIFI-3189
>>> >
>>> > When back-pressure happens on the queue coming out of ConsumeKafka,
>>> > this can last for longer than session.timeout.ms, and when the
>>> > processors resumes executing it receives this error on the first
>>> > execution. We should be able to implement some type of keep-alive so
>>> > that even when the processor is not executing, there is a background
>>> > thread, or some way of keeping the connections alive.
>>> >
>>> > I believe any user-defined properties in the processor get passed to
>>> > the Kafka consumer, so I believe you could add "session.timeout.ms"
>>> > and set a much higher value as a possible work around.
>>> >
>>> > Thanks,
>>> >
>>> > Bryan
>>> >
>>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ij...@gmail.com>
>>> > wrote:
>>> >> Hello Nick,
>>> >>
>>> >> First, I assume "had a queue back up" means have a queue being
>>> >> back-pressure. Sorry if that was different meaning.
>>> >>
>>> >> I was trying to reproduce by following flow:
>>> >> ConsumeKafka_0_10
>>> >>   -- success: Back Pressure Object Threshold = 10
>>> >>     -- UpdateAttribute (Stopped)
>>> >>
>>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>>> >> The result was, when NiFi received 10th messages, the success
>>> >> relationship back-pressure was enabled.
>>> >> When I published the 11th message, NiFi didn't do anything.
>>> >> This is expected behavior because downstream connection is
>>> >> back-pressured, the processor won't be scheduled.
>>> >>
>>> >> After I started UpdateAttribute and the queued flow files went
>>> >> through, ConsumeKafka was executed again and received the 11th
>>> >> message.
>>> >>
>>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
>>> >> those warning and error message is logged because NiFi received
>>> >> KafkaException when it tried to commit offset to Kafka.
>>> >>
>>> >> Were there anything in Kafka server logs? I suspect something had
>>> >> happened at Kafka server side.
>>> >>
>>> >> Thanks,
>>> >> Koji
>>> >>
>>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>>> >> <ni...@thecontrolgroup.com> wrote:
>>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates
>>> >>> without
>>> >>> problems. I had a queue back up due to a downstream processor and I
>>> >>> started
>>> >>> getting these bulletins.
>>> >>>
>>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates
>>> >>> are
>>> >>> likely as we were able to commit the process session but received an
>>> >>> exception from Kafka while committing offsets.
>>> >>>
>>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception
>>> >>> while
>>> >>> interacting with Kafka so will close the lease
>>> >>>
>>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> >>> cannot be completed since the group has already rebalanced and
>>> >>> assigned the
>>> >>> partitions to another member. This means that the time between
>>> >>> subsequent
>>> >>> calls to poll() was longer than the configured session.timeout.ms,
>>> >>> which
>>> >>> typically implies that the poll loop is spending too much time message
>>> >>> processing. You can address this either by increasing the session
>>> >>> timeout or
>>> >>> by reducing the maximum size of batches returned in poll() with
>>> >>> max.poll.records.
>>> >>>
>>> >>> My max.poll.records is set to 10000 on my consumer and
>>> >>> session.timeout.ms is
>>> >>> the default 10000 on the server.
>>> >>>
>>> >>> Since there is no such thing as coincidences, I believe this has to do
>>> >>> with
>>> >>> it not being able to push received messages to the downstream queue.
>>> >>>
>>> >>> If my flow is backed up, I expect the ConsumKafka processor not to
>>> >>> throw
>>> >>> errors but continue to heartbeat with the Kafka server and resume
>>> >>> consuming
>>> >>> once it can commit to the downstream queue?
>>> >>>
>>> >>> Might I have the server or consumer misconfigured to handle this
>>> >>> scenario or
>>> >>> should the consumer not be throwing this error?
>>> >>>
>>> >>> Thanks,
>>> >>> - Nick
>>
>>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Bryan Bende <bb...@gmail.com>.
No data loss, but you may process the same message twice in NiFi.

The ordering of operations is:

1) poll Kafka
2) write received data to flow file
3) commit NiFi session so data in flow file cannot be lost
4) commit offsets to Kafka

Doing it this way achieves at-least once processing which means you
can't ever lose data, but you can process data twice.

If we committed the offsets before committing the flow file you would
never get duplicates, but you could lose a message if a crash happened
between commit the offset and committing the NiFi session (at-most
once processing).

So the error is happening on #4 and NiFi has already produced a flow
file with the message, but then Kafka says it can't update the offset,
and then another consumer will likely pull that same message again and
produce another flow file with the same message.


On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
<ni...@thecontrolgroup.com> wrote:
> That makes perfect sense. To be clear, is there any potential to lose
> messages in this scenario?
>
> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:
>>
>> yeah this is probably a good case/cause for use of the pause concept
>> in kafka consumers.
>>
>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com> wrote:
>> > I believe you are running into this issue:
>> >
>> > https://issues.apache.org/jira/browse/NIFI-3189
>> >
>> > When back-pressure happens on the queue coming out of ConsumeKafka,
>> > this can last for longer than session.timeout.ms, and when the
>> > processors resumes executing it receives this error on the first
>> > execution. We should be able to implement some type of keep-alive so
>> > that even when the processor is not executing, there is a background
>> > thread, or some way of keeping the connections alive.
>> >
>> > I believe any user-defined properties in the processor get passed to
>> > the Kafka consumer, so I believe you could add "session.timeout.ms"
>> > and set a much higher value as a possible work around.
>> >
>> > Thanks,
>> >
>> > Bryan
>> >
>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ij...@gmail.com>
>> > wrote:
>> >> Hello Nick,
>> >>
>> >> First, I assume "had a queue back up" means have a queue being
>> >> back-pressure. Sorry if that was different meaning.
>> >>
>> >> I was trying to reproduce by following flow:
>> >> ConsumeKafka_0_10
>> >>   -- success: Back Pressure Object Threshold = 10
>> >>     -- UpdateAttribute (Stopped)
>> >>
>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>> >> The result was, when NiFi received 10th messages, the success
>> >> relationship back-pressure was enabled.
>> >> When I published the 11th message, NiFi didn't do anything.
>> >> This is expected behavior because downstream connection is
>> >> back-pressured, the processor won't be scheduled.
>> >>
>> >> After I started UpdateAttribute and the queued flow files went
>> >> through, ConsumeKafka was executed again and received the 11th
>> >> message.
>> >>
>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
>> >> those warning and error message is logged because NiFi received
>> >> KafkaException when it tried to commit offset to Kafka.
>> >>
>> >> Were there anything in Kafka server logs? I suspect something had
>> >> happened at Kafka server side.
>> >>
>> >> Thanks,
>> >> Koji
>> >>
>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>> >> <ni...@thecontrolgroup.com> wrote:
>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates
>> >>> without
>> >>> problems. I had a queue back up due to a downstream processor and I
>> >>> started
>> >>> getting these bulletins.
>> >>>
>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates
>> >>> are
>> >>> likely as we were able to commit the process session but received an
>> >>> exception from Kafka while committing offsets.
>> >>>
>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception
>> >>> while
>> >>> interacting with Kafka so will close the lease
>> >>>
>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> >>> cannot be completed since the group has already rebalanced and
>> >>> assigned the
>> >>> partitions to another member. This means that the time between
>> >>> subsequent
>> >>> calls to poll() was longer than the configured session.timeout.ms,
>> >>> which
>> >>> typically implies that the poll loop is spending too much time message
>> >>> processing. You can address this either by increasing the session
>> >>> timeout or
>> >>> by reducing the maximum size of batches returned in poll() with
>> >>> max.poll.records.
>> >>>
>> >>> My max.poll.records is set to 10000 on my consumer and
>> >>> session.timeout.ms is
>> >>> the default 10000 on the server.
>> >>>
>> >>> Since there is no such thing as coincidences, I believe this has to do
>> >>> with
>> >>> it not being able to push received messages to the downstream queue.
>> >>>
>> >>> If my flow is backed up, I expect the ConsumKafka processor not to
>> >>> throw
>> >>> errors but continue to heartbeat with the Kafka server and resume
>> >>> consuming
>> >>> once it can commit to the downstream queue?
>> >>>
>> >>> Might I have the server or consumer misconfigured to handle this
>> >>> scenario or
>> >>> should the consumer not be throwing this error?
>> >>>
>> >>> Thanks,
>> >>> - Nick
>
>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Nick Carenza <ni...@thecontrolgroup.com>.
That makes perfect sense. To be clear, is there any potential to lose
messages in this scenario?

On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <jo...@gmail.com> wrote:

> yeah this is probably a good case/cause for use of the pause concept
> in kafka consumers.
>
> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com> wrote:
> > I believe you are running into this issue:
> >
> > https://issues.apache.org/jira/browse/NIFI-3189
> >
> > When back-pressure happens on the queue coming out of ConsumeKafka,
> > this can last for longer than session.timeout.ms, and when the
> > processors resumes executing it receives this error on the first
> > execution. We should be able to implement some type of keep-alive so
> > that even when the processor is not executing, there is a background
> > thread, or some way of keeping the connections alive.
> >
> > I believe any user-defined properties in the processor get passed to
> > the Kafka consumer, so I believe you could add "session.timeout.ms"
> > and set a much higher value as a possible work around.
> >
> > Thanks,
> >
> > Bryan
> >
> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ij...@gmail.com>
> wrote:
> >> Hello Nick,
> >>
> >> First, I assume "had a queue back up" means have a queue being
> >> back-pressure. Sorry if that was different meaning.
> >>
> >> I was trying to reproduce by following flow:
> >> ConsumeKafka_0_10
> >>   -- success: Back Pressure Object Threshold = 10
> >>     -- UpdateAttribute (Stopped)
> >>
> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
> >> The result was, when NiFi received 10th messages, the success
> >> relationship back-pressure was enabled.
> >> When I published the 11th message, NiFi didn't do anything.
> >> This is expected behavior because downstream connection is
> >> back-pressured, the processor won't be scheduled.
> >>
> >> After I started UpdateAttribute and the queued flow files went
> >> through, ConsumeKafka was executed again and received the 11th
> >> message.
> >>
> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
> >> those warning and error message is logged because NiFi received
> >> KafkaException when it tried to commit offset to Kafka.
> >>
> >> Were there anything in Kafka server logs? I suspect something had
> >> happened at Kafka server side.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> >> <ni...@thecontrolgroup.com> wrote:
> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates
> without
> >>> problems. I had a queue back up due to a downstream processor and I
> started
> >>> getting these bulletins.
> >>>
> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates
> are
> >>> likely as we were able to commit the process session but received an
> >>> exception from Kafka while committing offsets.
> >>>
> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception
> while
> >>> interacting with Kafka so will close the lease
> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$
> SimpleConsumerLease@87d2ac1
> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
> >>> cannot be completed since the group has already rebalanced and
> assigned the
> >>> partitions to another member. This means that the time between
> subsequent
> >>> calls to poll() was longer than the configured session.timeout.ms,
> which
> >>> typically implies that the poll loop is spending too much time message
> >>> processing. You can address this either by increasing the session
> timeout or
> >>> by reducing the maximum size of batches returned in poll() with
> >>> max.poll.records.
> >>>
> >>> My max.poll.records is set to 10000 on my consumer and
> session.timeout.ms is
> >>> the default 10000 on the server.
> >>>
> >>> Since there is no such thing as coincidences, I believe this has to do
> with
> >>> it not being able to push received messages to the downstream queue.
> >>>
> >>> If my flow is backed up, I expect the ConsumKafka processor not to
> throw
> >>> errors but continue to heartbeat with the Kafka server and resume
> consuming
> >>> once it can commit to the downstream queue?
> >>>
> >>> Might I have the server or consumer misconfigured to handle this
> scenario or
> >>> should the consumer not be throwing this error?
> >>>
> >>> Thanks,
> >>> - Nick
>

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Joe Witt <jo...@gmail.com>.
yeah this is probably a good case/cause for use of the pause concept
in kafka consumers.

On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bb...@gmail.com> wrote:
> I believe you are running into this issue:
>
> https://issues.apache.org/jira/browse/NIFI-3189
>
> When back-pressure happens on the queue coming out of ConsumeKafka,
> this can last for longer than session.timeout.ms, and when the
> processors resumes executing it receives this error on the first
> execution. We should be able to implement some type of keep-alive so
> that even when the processor is not executing, there is a background
> thread, or some way of keeping the connections alive.
>
> I believe any user-defined properties in the processor get passed to
> the Kafka consumer, so I believe you could add "session.timeout.ms"
> and set a much higher value as a possible work around.
>
> Thanks,
>
> Bryan
>
> On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ij...@gmail.com> wrote:
>> Hello Nick,
>>
>> First, I assume "had a queue back up" means have a queue being
>> back-pressure. Sorry if that was different meaning.
>>
>> I was trying to reproduce by following flow:
>> ConsumeKafka_0_10
>>   -- success: Back Pressure Object Threshold = 10
>>     -- UpdateAttribute (Stopped)
>>
>> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>> The result was, when NiFi received 10th messages, the success
>> relationship back-pressure was enabled.
>> When I published the 11th message, NiFi didn't do anything.
>> This is expected behavior because downstream connection is
>> back-pressured, the processor won't be scheduled.
>>
>> After I started UpdateAttribute and the queued flow files went
>> through, ConsumeKafka was executed again and received the 11th
>> message.
>>
>> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
>> those warning and error message is logged because NiFi received
>> KafkaException when it tried to commit offset to Kafka.
>>
>> Were there anything in Kafka server logs? I suspect something had
>> happened at Kafka server side.
>>
>> Thanks,
>> Koji
>>
>> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>> <ni...@thecontrolgroup.com> wrote:
>>> Hey team, I have a ConsumeKafka_0_10 running which normally operates without
>>> problems. I had a queue back up due to a downstream processor and I started
>>> getting these bulletins.
>>>
>>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
>>> likely as we were able to commit the process session but received an
>>> exception from Kafka while committing offsets.
>>>
>>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
>>> interacting with Kafka so will close the lease
>>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> cannot be completed since the group has already rebalanced and assigned the
>>> partitions to another member. This means that the time between subsequent
>>> calls to poll() was longer than the configured session.timeout.ms, which
>>> typically implies that the poll loop is spending too much time message
>>> processing. You can address this either by increasing the session timeout or
>>> by reducing the maximum size of batches returned in poll() with
>>> max.poll.records.
>>>
>>> My max.poll.records is set to 10000 on my consumer and session.timeout.ms is
>>> the default 10000 on the server.
>>>
>>> Since there is no such thing as coincidences, I believe this has to do with
>>> it not being able to push received messages to the downstream queue.
>>>
>>> If my flow is backed up, I expect the ConsumKafka processor not to throw
>>> errors but continue to heartbeat with the Kafka server and resume consuming
>>> once it can commit to the downstream queue?
>>>
>>> Might I have the server or consumer misconfigured to handle this scenario or
>>> should the consumer not be throwing this error?
>>>
>>> Thanks,
>>> - Nick

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Bryan Bende <bb...@gmail.com>.
I believe you are running into this issue:

https://issues.apache.org/jira/browse/NIFI-3189

When back-pressure happens on the queue coming out of ConsumeKafka,
this can last for longer than session.timeout.ms, and when the
processors resumes executing it receives this error on the first
execution. We should be able to implement some type of keep-alive so
that even when the processor is not executing, there is a background
thread, or some way of keeping the connections alive.

I believe any user-defined properties in the processor get passed to
the Kafka consumer, so I believe you could add "session.timeout.ms"
and set a much higher value as a possible work around.

Thanks,

Bryan

On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ij...@gmail.com> wrote:
> Hello Nick,
>
> First, I assume "had a queue back up" means have a queue being
> back-pressure. Sorry if that was different meaning.
>
> I was trying to reproduce by following flow:
> ConsumeKafka_0_10
>   -- success: Back Pressure Object Threshold = 10
>     -- UpdateAttribute (Stopped)
>
> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
> The result was, when NiFi received 10th messages, the success
> relationship back-pressure was enabled.
> When I published the 11th message, NiFi didn't do anything.
> This is expected behavior because downstream connection is
> back-pressured, the processor won't be scheduled.
>
> After I started UpdateAttribute and the queued flow files went
> through, ConsumeKafka was executed again and received the 11th
> message.
>
> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
> those warning and error message is logged because NiFi received
> KafkaException when it tried to commit offset to Kafka.
>
> Were there anything in Kafka server logs? I suspect something had
> happened at Kafka server side.
>
> Thanks,
> Koji
>
> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> <ni...@thecontrolgroup.com> wrote:
>> Hey team, I have a ConsumeKafka_0_10 running which normally operates without
>> problems. I had a queue back up due to a downstream processor and I started
>> getting these bulletins.
>>
>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
>> likely as we were able to commit the process session but received an
>> exception from Kafka while committing offsets.
>>
>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
>> interacting with Kafka so will close the lease
>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> cannot be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured session.timeout.ms, which
>> typically implies that the poll loop is spending too much time message
>> processing. You can address this either by increasing the session timeout or
>> by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>>
>> My max.poll.records is set to 10000 on my consumer and session.timeout.ms is
>> the default 10000 on the server.
>>
>> Since there is no such thing as coincidences, I believe this has to do with
>> it not being able to push received messages to the downstream queue.
>>
>> If my flow is backed up, I expect the ConsumKafka processor not to throw
>> errors but continue to heartbeat with the Kafka server and resume consuming
>> once it can commit to the downstream queue?
>>
>> Might I have the server or consumer misconfigured to handle this scenario or
>> should the consumer not be throwing this error?
>>
>> Thanks,
>> - Nick

Re: ConsumeKafka processor erroring when held up by full queue

Posted by Koji Kawamura <ij...@gmail.com>.
Hello Nick,

First, I assume "had a queue back up" means have a queue being
back-pressure. Sorry if that was different meaning.

I was trying to reproduce by following flow:
ConsumeKafka_0_10
  -- success: Back Pressure Object Threshold = 10
    -- UpdateAttribute (Stopped)

Then I used ./bin/kafka-console-producer.sh to send 11 messages.
The result was, when NiFi received 10th messages, the success
relationship back-pressure was enabled.
When I published the 11th message, NiFi didn't do anything.
This is expected behavior because downstream connection is
back-pressured, the processor won't be scheduled.

After I started UpdateAttribute and the queued flow files went
through, ConsumeKafka was executed again and received the 11th
message.

Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
those warning and error message is logged because NiFi received
KafkaException when it tried to commit offset to Kafka.

Were there anything in Kafka server logs? I suspect something had
happened at Kafka server side.

Thanks,
Koji

On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
<ni...@thecontrolgroup.com> wrote:
> Hey team, I have a ConsumeKafka_0_10 running which normally operates without
> problems. I had a queue back up due to a downstream processor and I started
> getting these bulletins.
>
> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
> likely as we were able to commit the process session but received an
> exception from Kafka while committing offsets.
>
> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
> interacting with Kafka so will close the lease
> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured session.timeout.ms, which
> typically implies that the poll loop is spending too much time message
> processing. You can address this either by increasing the session timeout or
> by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>
> My max.poll.records is set to 10000 on my consumer and session.timeout.ms is
> the default 10000 on the server.
>
> Since there is no such thing as coincidences, I believe this has to do with
> it not being able to push received messages to the downstream queue.
>
> If my flow is backed up, I expect the ConsumKafka processor not to throw
> errors but continue to heartbeat with the Kafka server and resume consuming
> once it can commit to the downstream queue?
>
> Might I have the server or consumer misconfigured to handle this scenario or
> should the consumer not be throwing this error?
>
> Thanks,
> - Nick