You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Manan G <ma...@gmail.com> on 2017/11/03 20:37:59 UTC

Offset commit for partitions not owned by consumer

Hello,

I am using 0.11.0.0 version of Kakfa broker and Java client library. My
consumer code tracks offsets for each assigned partition and at some time
interval manually commits offsets by specifying partition->offset map.

What I noticed is, after the rebalance, even if consumer loses some
partitions that were assigned to it previously, offset commit for those
lost partitions still succeeds by that same consumer! Shouldn't offset
commit fail in this scenario since consumer is trying to commit offsets for
partitions that are not assigned to it?

For clarity below are the logs I see with comments:

// This is when consumer starts for "test" topic and it picks up 3
partitions
log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]

// Now consumer processes 3 records from partition 0 and 7 records from
partition 2 - confirmed with log statements
log>> ...

// Rebalance happens - right now, my code does not commit any pending
offsets here and just prints the log statement
log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]

// After re-balance, consumer loses partition 0 and 1. Again, my code does
not do anything on this callback and just prints the log statement
onPartitionsAssigned: partitions=[test-2]

// Since the code did not commit offsets during revoke call, after
rebalance, poll() returns all records for assigned partitions since last
offset commit.
// ... So we re-process 7 records from partition 2. This was confirmed with
log statements.
log>> ...

// Offset commit gets triggered after some time and due to the bug in the
code, it tries to commit offsets for both partition 0 and 2.
// There is no failure however! I can see on Kafka broker side that offset
for partition 0 is updated to 3.
// I made sure that another consumer that is actually assigned partition 0
after re-balance has not committed offset yet.
commitOffsets: {0=3, 2=7}


Thanks,
M

Re: Offset commit for partitions not owned by consumer

Posted by Ted Yu <yu...@gmail.com>.
Can you reveal code snippet for BufferedConsumerClientAdapterImpl ?

I took a look at the logs. There was no log around 19:46:5x in either
server or controller log.

Thanks

On Tue, Nov 7, 2017 at 8:35 AM, Mana M <ma...@gmail.com> wrote:

> Ted, did you get chance to look at the issue? I am also planning to update
> to latest version to find out if we still see the same issue.
>
> On Fri, Nov 3, 2017 at 8:40 PM, Mana M <ma...@gmail.com> wrote:
>
> > Below are the logs:
> >
> > Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91
> > Consumer 2 logs: https://pastebin.com/yfJDSGPA
> >
> > server.log: https://pastebin.com/QKpk0zLn
> > controller.log: https://pastebin.com/9T0niwEw
> > state-change.log: https://pastebin.com/nrftHPC9
> >
> >
> > On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu <yu...@gmail.com> wrote:
> >
> >> Can you pastebin relevant logs from client and broker ?
> >>
> >> Thanks
> >>
> >> On Fri, Nov 3, 2017 at 1:37 PM, Manan G <ma...@gmail.com> wrote:
> >>
> >> > Hello,
> >> >
> >> > I am using 0.11.0.0 version of Kakfa broker and Java client library.
> My
> >> > consumer code tracks offsets for each assigned partition and at some
> >> time
> >> > interval manually commits offsets by specifying partition->offset map.
> >> >
> >> > What I noticed is, after the rebalance, even if consumer loses some
> >> > partitions that were assigned to it previously, offset commit for
> those
> >> > lost partitions still succeeds by that same consumer! Shouldn't offset
> >> > commit fail in this scenario since consumer is trying to commit
> offsets
> >> for
> >> > partitions that are not assigned to it?
> >> >
> >> > For clarity below are the logs I see with comments:
> >> >
> >> > // This is when consumer starts for "test" topic and it picks up 3
> >> > partitions
> >> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
> >> >
> >> > // Now consumer processes 3 records from partition 0 and 7 records
> from
> >> > partition 2 - confirmed with log statements
> >> > log>> ...
> >> >
> >> > // Rebalance happens - right now, my code does not commit any pending
> >> > offsets here and just prints the log statement
> >> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
> >> >
> >> > // After re-balance, consumer loses partition 0 and 1. Again, my code
> >> does
> >> > not do anything on this callback and just prints the log statement
> >> > onPartitionsAssigned: partitions=[test-2]
> >> >
> >> > // Since the code did not commit offsets during revoke call, after
> >> > rebalance, poll() returns all records for assigned partitions since
> last
> >> > offset commit.
> >> > // ... So we re-process 7 records from partition 2. This was confirmed
> >> with
> >> > log statements.
> >> > log>> ...
> >> >
> >> > // Offset commit gets triggered after some time and due to the bug in
> >> the
> >> > code, it tries to commit offsets for both partition 0 and 2.
> >> > // There is no failure however! I can see on Kafka broker side that
> >> offset
> >> > for partition 0 is updated to 3.
> >> > // I made sure that another consumer that is actually assigned
> >> partition 0
> >> > after re-balance has not committed offset yet.
> >> > commitOffsets: {0=3, 2=7}
> >> >
> >> >
> >> > Thanks,
> >> > M
> >> >
> >>
> >
> >
>

Re: Offset commit for partitions not owned by consumer

Posted by Mana M <ma...@gmail.com>.
Ted, did you get chance to look at the issue? I am also planning to update
to latest version to find out if we still see the same issue.

On Fri, Nov 3, 2017 at 8:40 PM, Mana M <ma...@gmail.com> wrote:

> Below are the logs:
>
> Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91
> Consumer 2 logs: https://pastebin.com/yfJDSGPA
>
> server.log: https://pastebin.com/QKpk0zLn
> controller.log: https://pastebin.com/9T0niwEw
> state-change.log: https://pastebin.com/nrftHPC9
>
>
> On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Can you pastebin relevant logs from client and broker ?
>>
>> Thanks
>>
>> On Fri, Nov 3, 2017 at 1:37 PM, Manan G <ma...@gmail.com> wrote:
>>
>> > Hello,
>> >
>> > I am using 0.11.0.0 version of Kakfa broker and Java client library. My
>> > consumer code tracks offsets for each assigned partition and at some
>> time
>> > interval manually commits offsets by specifying partition->offset map.
>> >
>> > What I noticed is, after the rebalance, even if consumer loses some
>> > partitions that were assigned to it previously, offset commit for those
>> > lost partitions still succeeds by that same consumer! Shouldn't offset
>> > commit fail in this scenario since consumer is trying to commit offsets
>> for
>> > partitions that are not assigned to it?
>> >
>> > For clarity below are the logs I see with comments:
>> >
>> > // This is when consumer starts for "test" topic and it picks up 3
>> > partitions
>> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
>> >
>> > // Now consumer processes 3 records from partition 0 and 7 records from
>> > partition 2 - confirmed with log statements
>> > log>> ...
>> >
>> > // Rebalance happens - right now, my code does not commit any pending
>> > offsets here and just prints the log statement
>> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
>> >
>> > // After re-balance, consumer loses partition 0 and 1. Again, my code
>> does
>> > not do anything on this callback and just prints the log statement
>> > onPartitionsAssigned: partitions=[test-2]
>> >
>> > // Since the code did not commit offsets during revoke call, after
>> > rebalance, poll() returns all records for assigned partitions since last
>> > offset commit.
>> > // ... So we re-process 7 records from partition 2. This was confirmed
>> with
>> > log statements.
>> > log>> ...
>> >
>> > // Offset commit gets triggered after some time and due to the bug in
>> the
>> > code, it tries to commit offsets for both partition 0 and 2.
>> > // There is no failure however! I can see on Kafka broker side that
>> offset
>> > for partition 0 is updated to 3.
>> > // I made sure that another consumer that is actually assigned
>> partition 0
>> > after re-balance has not committed offset yet.
>> > commitOffsets: {0=3, 2=7}
>> >
>> >
>> > Thanks,
>> > M
>> >
>>
>
>

Re: Offset commit for partitions not owned by consumer

Posted by Mana M <ma...@gmail.com>.
Below are the logs:

Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91
Consumer 2 logs: https://pastebin.com/yfJDSGPA

server.log: https://pastebin.com/QKpk0zLn
controller.log: https://pastebin.com/9T0niwEw
state-change.log: https://pastebin.com/nrftHPC9


On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu <yu...@gmail.com> wrote:

> Can you pastebin relevant logs from client and broker ?
>
> Thanks
>
> On Fri, Nov 3, 2017 at 1:37 PM, Manan G <ma...@gmail.com> wrote:
>
> > Hello,
> >
> > I am using 0.11.0.0 version of Kakfa broker and Java client library. My
> > consumer code tracks offsets for each assigned partition and at some time
> > interval manually commits offsets by specifying partition->offset map.
> >
> > What I noticed is, after the rebalance, even if consumer loses some
> > partitions that were assigned to it previously, offset commit for those
> > lost partitions still succeeds by that same consumer! Shouldn't offset
> > commit fail in this scenario since consumer is trying to commit offsets
> for
> > partitions that are not assigned to it?
> >
> > For clarity below are the logs I see with comments:
> >
> > // This is when consumer starts for "test" topic and it picks up 3
> > partitions
> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
> >
> > // Now consumer processes 3 records from partition 0 and 7 records from
> > partition 2 - confirmed with log statements
> > log>> ...
> >
> > // Rebalance happens - right now, my code does not commit any pending
> > offsets here and just prints the log statement
> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
> >
> > // After re-balance, consumer loses partition 0 and 1. Again, my code
> does
> > not do anything on this callback and just prints the log statement
> > onPartitionsAssigned: partitions=[test-2]
> >
> > // Since the code did not commit offsets during revoke call, after
> > rebalance, poll() returns all records for assigned partitions since last
> > offset commit.
> > // ... So we re-process 7 records from partition 2. This was confirmed
> with
> > log statements.
> > log>> ...
> >
> > // Offset commit gets triggered after some time and due to the bug in the
> > code, it tries to commit offsets for both partition 0 and 2.
> > // There is no failure however! I can see on Kafka broker side that
> offset
> > for partition 0 is updated to 3.
> > // I made sure that another consumer that is actually assigned partition
> 0
> > after re-balance has not committed offset yet.
> > commitOffsets: {0=3, 2=7}
> >
> >
> > Thanks,
> > M
> >
>

Re: Offset commit for partitions not owned by consumer

Posted by Ted Yu <yu...@gmail.com>.
Can you pastebin relevant logs from client and broker ?

Thanks

On Fri, Nov 3, 2017 at 1:37 PM, Manan G <ma...@gmail.com> wrote:

> Hello,
>
> I am using 0.11.0.0 version of Kakfa broker and Java client library. My
> consumer code tracks offsets for each assigned partition and at some time
> interval manually commits offsets by specifying partition->offset map.
>
> What I noticed is, after the rebalance, even if consumer loses some
> partitions that were assigned to it previously, offset commit for those
> lost partitions still succeeds by that same consumer! Shouldn't offset
> commit fail in this scenario since consumer is trying to commit offsets for
> partitions that are not assigned to it?
>
> For clarity below are the logs I see with comments:
>
> // This is when consumer starts for "test" topic and it picks up 3
> partitions
> log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
>
> // Now consumer processes 3 records from partition 0 and 7 records from
> partition 2 - confirmed with log statements
> log>> ...
>
> // Rebalance happens - right now, my code does not commit any pending
> offsets here and just prints the log statement
> log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
>
> // After re-balance, consumer loses partition 0 and 1. Again, my code does
> not do anything on this callback and just prints the log statement
> onPartitionsAssigned: partitions=[test-2]
>
> // Since the code did not commit offsets during revoke call, after
> rebalance, poll() returns all records for assigned partitions since last
> offset commit.
> // ... So we re-process 7 records from partition 2. This was confirmed with
> log statements.
> log>> ...
>
> // Offset commit gets triggered after some time and due to the bug in the
> code, it tries to commit offsets for both partition 0 and 2.
> // There is no failure however! I can see on Kafka broker side that offset
> for partition 0 is updated to 3.
> // I made sure that another consumer that is actually assigned partition 0
> after re-balance has not committed offset yet.
> commitOffsets: {0=3, 2=7}
>
>
> Thanks,
> M
>