You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitriy Vsekhvalnov <dv...@gmail.com> on 2017/07/03 14:56:09 UTC

Consumers re-consuming messages again after re-balance?

Hi all,

looking for some explanations. We running 2 instances of consumer (same
consumer group) and getting little bit weird behavior  after 3 days of
inactivity.

Env:

kafka broker 0.10.2.1
consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
default settings).

Scenario:

1. running volume testing
2. once all messages are processed by consumers we keep env idle (no new
messages) for several days
3. after ~3 days one of consumer start to receive messages again.

Reproduced twice so far.

I understand it's probably due to re-balance, but
 - why after 3 days?
 - and how can we avoid it as much as we can (i know it is "at-least-once")
? (manual offset commits?)

Below is relevant lines of log from both consumer hosts.

Consumer host #1
[2017-07-02 08:20:05,445] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
[2017-07-02 08:20:05,432] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
newly assigned partitions [test.topic-1, test.topic-2, test.topic-0] for
group test-consumer]
[2017-07-02 08:20:05,432] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Successfully joined group test-consumer with generation 43]
[2017-07-02 08:20:05,396] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[(Re-)joining group test-consumer]
[2017-07-02 08:20:05,396] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions revoked:[test.topic-1, test.topic-0]]
[2017-07-02 08:20:05,127] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
[Revoking previously assigned partitions [test.topic-1, test.topic-0] for
group test-consumer]
[2017-06-29 07:20:37,172] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions assigned:[test.topic-1, test.topic-0]]
[2017-06-29 07:20:37,164] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
newly assigned partitions [test.topic-1, test.topic-0] for group
test-consumer]
[2017-06-29 07:20:37,164] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Successfully joined group test-consumer with generation 42]
[2017-06-29 07:20:34,822] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-dVZt5etOBt]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[(Re-)joining group test-consumer]



Consumer host #2
[2017-07-02 08:35:16,844] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Discovered coordinator 172.18.3.132:9092 (id: 2147482638 rack: null) for
group test-consumer.]
[2017-07-02 08:22:25,526] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Marking
the coordinator 172.18.3.132:9092 (id: 2147482638 rack: null) dead for
group test-consumer]
[2017-06-29 07:20:37,168] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.springframework.kafka.listener.KafkaMessageListenerContainer]
[partitions assigned:[test.topic-2]]
[2017-06-29 07:20:37,163] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
newly assigned partitions [test.topic-2] for group test-consumer]
[2017-06-29 07:20:37,163] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[Successfully joined group test-consumer with generation 42]
[2017-06-29 07:20:37,156] [INFO ]
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
[callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
[(Re-)joining group test-consumer]

Thanks all.

Re: Consumers re-consuming messages again after re-balance?

Posted by Dmitriy Vsekhvalnov <dv...@gmail.com>.
Thanks guys,

was exactly `offsets.retention.minutes`.

Figured out that `enable.auto.commit` was set to false in reality,
somewhere deep in spring properties and that's what have been causing
offsets removal when idle.



On Mon, Jul 3, 2017 at 7:04 PM, Dmitriy Vsekhvalnov <dv...@gmail.com>
wrote:

> Ouch, interesting.
>
> If by chance auto offset commit failed? Is there is way to prove it
> (something to search in a logs)?
>
> On Mon, Jul 3, 2017 at 6:29 PM, Tom Bentley <t....@gmail.com> wrote:
>
>> Hi Dmitriy,
>>
>> FTR, https://issues.apache.org/jira/browse/KAFKA-3806 is the issue Damian
>> is referring to, but it doesn't quite fit what you describe because you
>> said your consumer was configured with enable.auto.commit = true, which
>> should keep committing even if there are no messages being consumed.
>>
>> On 3 July 2017 at 16:25, Damian Guy <da...@gmail.com> wrote:
>>
>> > Hi Dmitriy,
>> >
>> > It is possibly related to the broker setting
>> `offsets.retention.minutes` -
>> > this defaults to 24 hours. If an offset hasn't been updated within that
>> > time it will be removed. So if your env was sitting idle for longer than
>> > this period, then rebalanced, you will likely start consuming the
>> messages
>> > from the earliest offset again. I'd recommend setting this higher than
>> the
>> > default of 24 hours.
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > looking for some explanations. We running 2 instances of consumer
>> (same
>> > > consumer group) and getting little bit weird behavior  after 3 days of
>> > > inactivity.
>> > >
>> > > Env:
>> > >
>> > > kafka broker 0.10.2.1
>> > > consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
>> > > default settings).
>> > >
>> > > Scenario:
>> > >
>> > > 1. running volume testing
>> > > 2. once all messages are processed by consumers we keep env idle (no
>> new
>> > > messages) for several days
>> > > 3. after ~3 days one of consumer start to receive messages again.
>> > >
>> > > Reproduced twice so far.
>> > >
>> > > I understand it's probably due to re-balance, but
>> > >  - why after 3 days?
>> > >  - and how can we avoid it as much as we can (i know it is
>> > "at-least-once")
>> > > ? (manual offset commits?)
>> > >
>> > > Below is relevant lines of log from both consumer hosts.
>> > >
>> > > Consumer host #1
>> > > [2017-07-02 08:20:05,445] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
>> > > [2017-07-02 08:20:05,432] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > [Setting
>> > > newly assigned partitions [test.topic-1, test.topic-2, test.topic-0]
>> for
>> > > group test-consumer]
>> > > [2017-07-02 08:20:05,432] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Successfully joined group test-consumer with generation 43]
>> > > [2017-07-02 08:20:05,396] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [(Re-)joining group test-consumer]
>> > > [2017-07-02 08:20:05,396] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions revoked:[test.topic-1, test.topic-0]]
>> > > [2017-07-02 08:20:05,127] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > > [Revoking previously assigned partitions [test.topic-1, test.topic-0]
>> for
>> > > group test-consumer]
>> > > [2017-06-29 07:20:37,172] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions assigned:[test.topic-1, test.topic-0]]
>> > > [2017-06-29 07:20:37,164] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > [Setting
>> > > newly assigned partitions [test.topic-1, test.topic-0] for group
>> > > test-consumer]
>> > > [2017-06-29 07:20:37,164] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Successfully joined group test-consumer with generation 42]
>> > > [2017-06-29 07:20:34,822] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [(Re-)joining group test-consumer]
>> > >
>> > >
>> > >
>> > > Consumer host #2
>> > > [2017-07-02 08:35:16,844] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
>> > > <(214)%20748-2638> rack: null) for
>> > > group test-consumer.]
>> > > [2017-07-02 08:22:25,526] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > [Marking
>> > > the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
>> > > rack: null) dead for
>> > > group test-consumer]
>> > > [2017-06-29 07:20:37,168] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
>> > > [partitions assigned:[test.topic-2]]
>> > > [2017-06-29 07:20:37,163] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
>> > [Setting
>> > > newly assigned partitions [test.topic-2] for group test-consumer]
>> > > [2017-06-29 07:20:37,163] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [Successfully joined group test-consumer with generation 42]
>> > > [2017-06-29 07:20:37,156] [INFO ]
>> > >
>> > > [org.springframework.kafka.KafkaListenerEndpointContainer
>> > #0-0-kafka-consumer-1]
>> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
>> > > [(Re-)joining group test-consumer]
>> > >
>> > > Thanks all.
>> > >
>> >
>>
>
>

Re: Consumers re-consuming messages again after re-balance?

Posted by Dmitriy Vsekhvalnov <dv...@gmail.com>.
Ouch, interesting.

If by chance auto offset commit failed? Is there is way to prove it
(something to search in a logs)?

On Mon, Jul 3, 2017 at 6:29 PM, Tom Bentley <t....@gmail.com> wrote:

> Hi Dmitriy,
>
> FTR, https://issues.apache.org/jira/browse/KAFKA-3806 is the issue Damian
> is referring to, but it doesn't quite fit what you describe because you
> said your consumer was configured with enable.auto.commit = true, which
> should keep committing even if there are no messages being consumed.
>
> On 3 July 2017 at 16:25, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Dmitriy,
> >
> > It is possibly related to the broker setting `offsets.retention.minutes`
> -
> > this defaults to 24 hours. If an offset hasn't been updated within that
> > time it will be removed. So if your env was sitting idle for longer than
> > this period, then rebalanced, you will likely start consuming the
> messages
> > from the earliest offset again. I'd recommend setting this higher than
> the
> > default of 24 hours.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dv...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > looking for some explanations. We running 2 instances of consumer (same
> > > consumer group) and getting little bit weird behavior  after 3 days of
> > > inactivity.
> > >
> > > Env:
> > >
> > > kafka broker 0.10.2.1
> > > consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
> > > default settings).
> > >
> > > Scenario:
> > >
> > > 1. running volume testing
> > > 2. once all messages are processed by consumers we keep env idle (no
> new
> > > messages) for several days
> > > 3. after ~3 days one of consumer start to receive messages again.
> > >
> > > Reproduced twice so far.
> > >
> > > I understand it's probably due to re-balance, but
> > >  - why after 3 days?
> > >  - and how can we avoid it as much as we can (i know it is
> > "at-least-once")
> > > ? (manual offset commits?)
> > >
> > > Below is relevant lines of log from both consumer hosts.
> > >
> > > Consumer host #1
> > > [2017-07-02 08:20:05,445] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
> > > [2017-07-02 08:20:05,432] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Setting
> > > newly assigned partitions [test.topic-1, test.topic-2, test.topic-0]
> for
> > > group test-consumer]
> > > [2017-07-02 08:20:05,432] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Successfully joined group test-consumer with generation 43]
> > > [2017-07-02 08:20:05,396] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [(Re-)joining group test-consumer]
> > > [2017-07-02 08:20:05,396] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions revoked:[test.topic-1, test.topic-0]]
> > > [2017-07-02 08:20:05,127] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > > [Revoking previously assigned partitions [test.topic-1, test.topic-0]
> for
> > > group test-consumer]
> > > [2017-06-29 07:20:37,172] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions assigned:[test.topic-1, test.topic-0]]
> > > [2017-06-29 07:20:37,164] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Setting
> > > newly assigned partitions [test.topic-1, test.topic-0] for group
> > > test-consumer]
> > > [2017-06-29 07:20:37,164] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Successfully joined group test-consumer with generation 42]
> > > [2017-06-29 07:20:34,822] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [(Re-)joining group test-consumer]
> > >
> > >
> > >
> > > Consumer host #2
> > > [2017-07-02 08:35:16,844] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
> > > <(214)%20748-2638> rack: null) for
> > > group test-consumer.]
> > > [2017-07-02 08:22:25,526] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Marking
> > > the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
> > > rack: null) dead for
> > > group test-consumer]
> > > [2017-06-29 07:20:37,168] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > > [partitions assigned:[test.topic-2]]
> > > [2017-06-29 07:20:37,163] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Setting
> > > newly assigned partitions [test.topic-2] for group test-consumer]
> > > [2017-06-29 07:20:37,163] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [Successfully joined group test-consumer with generation 42]
> > > [2017-06-29 07:20:37,156] [INFO ]
> > >
> > > [org.springframework.kafka.KafkaListenerEndpointContainer
> > #0-0-kafka-consumer-1]
> > > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> > >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > > [(Re-)joining group test-consumer]
> > >
> > > Thanks all.
> > >
> >
>

Re: Consumers re-consuming messages again after re-balance?

Posted by Tom Bentley <t....@gmail.com>.
Hi Dmitriy,

FTR, https://issues.apache.org/jira/browse/KAFKA-3806 is the issue Damian
is referring to, but it doesn't quite fit what you describe because you
said your consumer was configured with enable.auto.commit = true, which
should keep committing even if there are no messages being consumed.

On 3 July 2017 at 16:25, Damian Guy <da...@gmail.com> wrote:

> Hi Dmitriy,
>
> It is possibly related to the broker setting `offsets.retention.minutes` -
> this defaults to 24 hours. If an offset hasn't been updated within that
> time it will be removed. So if your env was sitting idle for longer than
> this period, then rebalanced, you will likely start consuming the messages
> from the earliest offset again. I'd recommend setting this higher than the
> default of 24 hours.
>
> Thanks,
> Damian
>
> On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dv...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > looking for some explanations. We running 2 instances of consumer (same
> > consumer group) and getting little bit weird behavior  after 3 days of
> > inactivity.
> >
> > Env:
> >
> > kafka broker 0.10.2.1
> > consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
> > default settings).
> >
> > Scenario:
> >
> > 1. running volume testing
> > 2. once all messages are processed by consumers we keep env idle (no new
> > messages) for several days
> > 3. after ~3 days one of consumer start to receive messages again.
> >
> > Reproduced twice so far.
> >
> > I understand it's probably due to re-balance, but
> >  - why after 3 days?
> >  - and how can we avoid it as much as we can (i know it is
> "at-least-once")
> > ? (manual offset commits?)
> >
> > Below is relevant lines of log from both consumer hosts.
> >
> > Consumer host #1
> > [2017-07-02 08:20:05,445] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
> > [2017-07-02 08:20:05,432] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Setting
> > newly assigned partitions [test.topic-1, test.topic-2, test.topic-0] for
> > group test-consumer]
> > [2017-07-02 08:20:05,432] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Successfully joined group test-consumer with generation 43]
> > [2017-07-02 08:20:05,396] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [(Re-)joining group test-consumer]
> > [2017-07-02 08:20:05,396] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions revoked:[test.topic-1, test.topic-0]]
> > [2017-07-02 08:20:05,127] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> > [Revoking previously assigned partitions [test.topic-1, test.topic-0] for
> > group test-consumer]
> > [2017-06-29 07:20:37,172] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions assigned:[test.topic-1, test.topic-0]]
> > [2017-06-29 07:20:37,164] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Setting
> > newly assigned partitions [test.topic-1, test.topic-0] for group
> > test-consumer]
> > [2017-06-29 07:20:37,164] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Successfully joined group test-consumer with generation 42]
> > [2017-06-29 07:20:34,822] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-dVZt5etOBt]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [(Re-)joining group test-consumer]
> >
> >
> >
> > Consumer host #2
> > [2017-07-02 08:35:16,844] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
> > <(214)%20748-2638> rack: null) for
> > group test-consumer.]
> > [2017-07-02 08:22:25,526] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Marking
> > the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
> > rack: null) dead for
> > group test-consumer]
> > [2017-06-29 07:20:37,168] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> > [partitions assigned:[test.topic-2]]
> > [2017-06-29 07:20:37,163] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Setting
> > newly assigned partitions [test.topic-2] for group test-consumer]
> > [2017-06-29 07:20:37,163] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [Successfully joined group test-consumer with generation 42]
> > [2017-06-29 07:20:37,156] [INFO ]
> >
> > [org.springframework.kafka.KafkaListenerEndpointContainer
> #0-0-kafka-consumer-1]
> > [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
> >  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> > [(Re-)joining group test-consumer]
> >
> > Thanks all.
> >
>

Re: Consumers re-consuming messages again after re-balance?

Posted by Damian Guy <da...@gmail.com>.
Hi Dmitriy,

It is possibly related to the broker setting `offsets.retention.minutes` -
this defaults to 24 hours. If an offset hasn't been updated within that
time it will be removed. So if your env was sitting idle for longer than
this period, then rebalanced, you will likely start consuming the messages
from the earliest offset again. I'd recommend setting this higher than the
default of 24 hours.

Thanks,
Damian

On Mon, 3 Jul 2017 at 15:56 Dmitriy Vsekhvalnov <dv...@gmail.com>
wrote:

> Hi all,
>
> looking for some explanations. We running 2 instances of consumer (same
> consumer group) and getting little bit weird behavior  after 3 days of
> inactivity.
>
> Env:
>
> kafka broker 0.10.2.1
> consumer java 0.10.2.1 + spring-kafka + enable.auto.commit = true (all
> default settings).
>
> Scenario:
>
> 1. running volume testing
> 2. once all messages are processed by consumers we keep env idle (no new
> messages) for several days
> 3. after ~3 days one of consumer start to receive messages again.
>
> Reproduced twice so far.
>
> I understand it's probably due to re-balance, but
>  - why after 3 days?
>  - and how can we avoid it as much as we can (i know it is "at-least-once")
> ? (manual offset commits?)
>
> Below is relevant lines of log from both consumer hosts.
>
> Consumer host #1
> [2017-07-02 08:20:05,445] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions assigned:[test.topic-1, test.topic-2, test.topic-0]]
> [2017-07-02 08:20:05,432] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
> newly assigned partitions [test.topic-1, test.topic-2, test.topic-0] for
> group test-consumer]
> [2017-07-02 08:20:05,432] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Successfully joined group test-consumer with generation 43]
> [2017-07-02 08:20:05,396] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [(Re-)joining group test-consumer]
> [2017-07-02 08:20:05,396] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions revoked:[test.topic-1, test.topic-0]]
> [2017-07-02 08:20:05,127] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
> [Revoking previously assigned partitions [test.topic-1, test.topic-0] for
> group test-consumer]
> [2017-06-29 07:20:37,172] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions assigned:[test.topic-1, test.topic-0]]
> [2017-06-29 07:20:37,164] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
> newly assigned partitions [test.topic-1, test.topic-0] for group
> test-consumer]
> [2017-06-29 07:20:37,164] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Successfully joined group test-consumer with generation 42]
> [2017-06-29 07:20:34,822] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-dVZt5etOBt]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [(Re-)joining group test-consumer]
>
>
>
> Consumer host #2
> [2017-07-02 08:35:16,844] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Discovered coordinator 172.18.3.132:9092 (id: 2147482638
> <(214)%20748-2638> rack: null) for
> group test-consumer.]
> [2017-07-02 08:22:25,526] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Marking
> the coordinator 172.18.3.132:9092 (id: 2147482638 <(214)%20748-2638>
> rack: null) dead for
> group test-consumer]
> [2017-06-29 07:20:37,168] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.springframework.kafka.listener.KafkaMessageListenerContainer]
> [partitions assigned:[test.topic-2]]
> [2017-06-29 07:20:37,163] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Setting
> newly assigned partitions [test.topic-2] for group test-consumer]
> [2017-06-29 07:20:37,163] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [Successfully joined group test-consumer with generation 42]
> [2017-06-29 07:20:37,156] [INFO ]
>
> [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1]
> [callingRequestId=] [requestId=schedule-cjlIyDE5Nz]
>  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
> [(Re-)joining group test-consumer]
>
> Thanks all.
>