You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Sriharsha Chintalapani <ha...@hortonworks.com> on 2015/03/30 23:47:17 UTC

Review Request 32650: Patch for KAFKA-2000

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-2000
    https://issues.apache.org/jira/browse/KAFKA-2000


Repository: kafka


Description
-------

KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.


Diffs
-----

  core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 

Diff: https://reviews.apache.org/r/32650/diff/


Testing
-------


Thanks,

Sriharsha Chintalapani


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Joel Koshy <jj...@gmail.com>.

> On April 20, 2015, 11:18 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 124
> > <https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124>
> >
> >     I think there is an issue in relying on the metadata cache mainly due to start-up. E.g., when we start up the broker (and offset manager) the metadata cache will actually be empty so this would delete _all_ the offsets. Unfortunately even after start-up there is no _guarantee_ that you have the most current information in the cache (say, if the controller failed to send an UpdateMetadataRequest to the broker by the time the compactor task runs)

Actually - I think what you have is correct. The offset cache would be empty at start-up and would only be populated on becoming leader. However, we just need to make sure that we get the complete cluster topic metadata before the compactor thread runs. I'll take another look tomorrow.


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review80857
-----------------------------------------------------------


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review80857
-----------------------------------------------------------



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/32650/#comment130981>

    I think there is an issue in relying on the metadata cache mainly due to start-up. E.g., when we start up the broker (and offset manager) the metadata cache will actually be empty so this would delete _all_ the offsets. Unfortunately even after start-up there is no _guarantee_ that you have the most current information in the cache (say, if the controller failed to send an UpdateMetadataRequest to the broker by the time the compactor task runs)


- Joel Koshy


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Gwen Shapira <gs...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review78310
-----------------------------------------------------------

Ship it!


Very nice fix :)

- Gwen Shapira


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Onur Karaman <ok...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review78337
-----------------------------------------------------------


This might be out of scope for this JIRA, but I think if the deleted topic gets recreated before compaction, the offsets corresponding to the older version of the topic won't be deleted.

This usually doesn't matter because auto.offset.reset will be triggered if the new version of the topic is smaller than the old version in terms of offsets. As with delete topic from zookeeper-based offsets, there's the edge case of the consumer skipping messages from the new version of the topic if old version's offsets still fit. This edge case was briefly discussed here: https://issues.apache.org/jira/browse/KAFKA-1787

- Onur Karaman


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Sriharsha Chintalapani <ha...@hortonworks.com>.

> On April 23, 2015, 9:51 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 124
> > <https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124>
> >
> >     A safer fix is to proactively purge as part of UpdateMetadataRequest - i.e., removePartitionInfo in metadata cache.
> >     
> >     Your fix is nice, but we need to make sure of the following: on a given offset manager (broker) the metadata cache must contain topic X before any consumer of topic X (and whose group is managed by that broker) commits offsets for topic X.
> >     
> >     The original scenario I was concerned about should be fine:
> >     - Suppose broker A (offset manager for G) starts up
> >     - It receives UpdateMetadataRequests from the controller for all topics in the cluster
> >     - It then receives LeaderAndIsrRequest for partitions of the offset topic which make it the offset manager.
> >     - We should be fine _as long as_ the update metadata requests occur first. So if we go with your approach we should at the very least add a unit test to guarantee this.
> >     
> >     There is another scenario. If topic X is a new topic (or has new partitions):
> >     - Broker A is the offset manager for consumer group G
> >     - Broker B leads a new partition of X
> >     - Controller C sends become leader to B and update metadata to A (which will populate its metadata cache)
> >     - B becomes the leader first
> >     - A consumer starts consuming X and commits offsets to A (before it has received the update metadata request)
> >     - Other consumers in the group may rebalance while all this is happening (since new partitions for the topic appeared) and may fetch offsets from A
> >     - But A could have deleted the offset by then.
> >     - This is improbable but not impossible.
> >     
> >     Onur mentioned another corner case:
> >     https://issues.apache.org/jira/browse/KAFKA-1787 
> >     
> >     Both would be solved by having topic generations and incorporating generation information when determining which offsets to purge. I don't think we have a jira open for that but I will follow-up offline with Onur.
> >     
> >     Do you see any other issues?
> >     
> >     So I think the options are:
> >     - Go with your approach + a unit test to ensure that the controller sends update metadata request first.
> >     - Go with the more conservative fix which is to purge on metadataCache.removePartitionInfo
> >     
> >     Also, we should add a unit test to verify offsets are in fact removed after deletion.
> 
> Joel Koshy wrote:
>     Never mind - for the second scenario we are fine. We check in offset manager if the topic exists before committing offsets.
>     
>     So your fix should be fine. Can you add a unit test?

Thanks for the review Joel. Added a unit test to check if the offsets are deleted after topic deletion.


- Sriharsha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
-----------------------------------------------------------


On May 3, 2015, 5:39 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated May 3, 2015, 5:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 652208a70f66045b854549d93cbbc2b77c24b10b 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Joel Koshy <jj...@gmail.com>.

> On April 23, 2015, 9:51 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 124
> > <https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124>
> >
> >     A safer fix is to proactively purge as part of UpdateMetadataRequest - i.e., removePartitionInfo in metadata cache.
> >     
> >     Your fix is nice, but we need to make sure of the following: on a given offset manager (broker) the metadata cache must contain topic X before any consumer of topic X (and whose group is managed by that broker) commits offsets for topic X.
> >     
> >     The original scenario I was concerned about should be fine:
> >     - Suppose broker A (offset manager for G) starts up
> >     - It receives UpdateMetadataRequests from the controller for all topics in the cluster
> >     - It then receives LeaderAndIsrRequest for partitions of the offset topic which make it the offset manager.
> >     - We should be fine _as long as_ the update metadata requests occur first. So if we go with your approach we should at the very least add a unit test to guarantee this.
> >     
> >     There is another scenario. If topic X is a new topic (or has new partitions):
> >     - Broker A is the offset manager for consumer group G
> >     - Broker B leads a new partition of X
> >     - Controller C sends become leader to B and update metadata to A (which will populate its metadata cache)
> >     - B becomes the leader first
> >     - A consumer starts consuming X and commits offsets to A (before it has received the update metadata request)
> >     - Other consumers in the group may rebalance while all this is happening (since new partitions for the topic appeared) and may fetch offsets from A
> >     - But A could have deleted the offset by then.
> >     - This is improbable but not impossible.
> >     
> >     Onur mentioned another corner case:
> >     https://issues.apache.org/jira/browse/KAFKA-1787 
> >     
> >     Both would be solved by having topic generations and incorporating generation information when determining which offsets to purge. I don't think we have a jira open for that but I will follow-up offline with Onur.
> >     
> >     Do you see any other issues?
> >     
> >     So I think the options are:
> >     - Go with your approach + a unit test to ensure that the controller sends update metadata request first.
> >     - Go with the more conservative fix which is to purge on metadataCache.removePartitionInfo
> >     
> >     Also, we should add a unit test to verify offsets are in fact removed after deletion.

Never mind - for the second scenario we are fine. We check in offset manager if the topic exists before committing offsets.

So your fix should be fine. Can you add a unit test?


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
-----------------------------------------------------------


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
-----------------------------------------------------------



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/32650/#comment131755>

    A safer fix is to proactively purge as part of UpdateMetadataRequest - i.e., removePartitionInfo in metadata cache.
    
    Your fix is nice, but we need to make sure of the following: on a given offset manager (broker) the metadata cache must contain topic X before any consumer of topic X (and whose group is managed by that broker) commits offsets for topic X.
    
    The original scenario I was concerned about should be fine:
    - Suppose broker A (offset manager for G) starts up
    - It receives UpdateMetadataRequests from the controller for all topics in the cluster
    - It then receives LeaderAndIsrRequest for partitions of the offset topic which make it the offset manager.
    - We should be fine _as long as_ the update metadata requests occur first. So if we go with your approach we should at the very least add a unit test to guarantee this.
    
    There is another scenario. If topic X is a new topic (or has new partitions):
    - Broker A is the offset manager for consumer group G
    - Broker B leads a new partition of X
    - Controller C sends become leader to B and update metadata to A (which will populate its metadata cache)
    - B becomes the leader first
    - A consumer starts consuming X and commits offsets to A (before it has received the update metadata request)
    - Other consumers in the group may rebalance while all this is happening (since new partitions for the topic appeared) and may fetch offsets from A
    - But A could have deleted the offset by then.
    - This is improbable but not impossible.
    
    Onur mentioned another corner case:
    https://issues.apache.org/jira/browse/KAFKA-1787 
    
    Both would be solved by having topic generations and incorporating generation information when determining which offsets to purge. I don't think we have a jira open for that but I will follow-up offline with Onur.
    
    Do you see any other issues?
    
    So I think the options are:
    - Go with your approach + a unit test to ensure that the controller sends update metadata request first.
    - Go with the more conservative fix which is to purge on metadataCache.removePartitionInfo
    
    Also, we should add a unit test to verify offsets are in fact removed after deletion.


- Joel Koshy


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review82775
-----------------------------------------------------------


Thanks for the updated patch.


core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/32650/#comment133593>

    Sorry I didn't notice this earlier. This message is now slightly incorrect. Can we get a break-up of the number of offsets deleted due to expiration and due to topic deletion? BTW I'm touching this in KAFKA-2163 as well (which you may want to check out).



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/32650/#comment133586>

    Pre-existing issue, but could you rename this to OffsetManagementTest?



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/32650/#comment133587>

    testOffsetsDeletedAfterTopicDeletion



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/32650/#comment133588>

    Can you use the more recent commit version (which has an explicit retention time?)



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
<https://reviews.apache.org/r/32650/#comment133590>

    Can you also commit offsets for some other topic and verify that those offsets are _not_ deleted?
    
    As mentioned in the earlier RB, for the first scenario, we depend on the condition that the UpdateMetadataRequest is sent first. It would be good to have a unit test that explicitly tests this so we never unknowingly break that assumption. I don't have a good way to test this though :( If you have any ideas that would be great. Part of the issue is we have little to no test coverage on the controller.


- Joel Koshy


On May 3, 2015, 5:39 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> -----------------------------------------------------------
> 
> (Updated May 3, 2015, 5:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
>     https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 652208a70f66045b854549d93cbbc2b77c24b10b 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>


Re: Review Request 32650: Patch for KAFKA-2000

Posted by Sriharsha Chintalapani <ha...@hortonworks.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/
-----------------------------------------------------------

(Updated May 3, 2015, 5:39 p.m.)


Review request for kafka.


Bugs: KAFKA-2000
    https://issues.apache.org/jira/browse/KAFKA-2000


Repository: kafka


Description
-------

KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.


Diffs (updated)
-----

  core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 652208a70f66045b854549d93cbbc2b77c24b10b 

Diff: https://reviews.apache.org/r/32650/diff/


Testing
-------


Thanks,

Sriharsha Chintalapani