You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Tejas Patil <te...@apache.org> on 2013/08/30 23:19:10 UTC

Re: Review Request 13908: Initial patch KAFKA-1012

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/13908/
-----------------------------------------------------------

(Updated Aug. 30, 2013, 9:19 p.m.)


Review request for kafka, Jay Kreps and Neha Narkhede.


Repository: kafka


Description
-------

See https://issues.apache.org/jira/browse/KAFKA-1012 for details.


Diffs
-----

  config/server.properties 7685879c2ab3d2dde1561bd34e6d9c55bc2429e3 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala a4c5623dbd48d9a0f21b87e39d63cde3604c64a0 
  core/src/main/scala/kafka/common/ErrorMapping.scala 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a34202b4a635582c74c0068f3ae1bde0a13 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e3a64204513467cef8917f501c3bc0e5b1db2e3e 
  core/src/main/scala/kafka/producer/DefaultPartitioner.scala 37ddd55b49680262ac348f77bb029dd4e84958cb 
  core/src/main/scala/kafka/server/KafkaApis.scala 0ec031ad9423b82ba9c8a49fe984337620392a8b 
  core/src/main/scala/kafka/server/KafkaConfig.scala ebbbdea8ab8798c95b39be9594b5b805a0f29d29 
  core/src/main/scala/kafka/server/KafkaServer.scala a925ae1a41fcb71f00ddf9e111172ec8a7fca749 
  core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaManager.scala 73c87c663981002b52a0c4995a6ef96ca24d5ef4 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala bc415e3156810db6c41509d9eb4aed4484496eee 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 

Diff: https://reviews.apache.org/r/13908/diff/


Testing
-------

Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits pass


Thanks,

Tejas Patil


Re: Review Request 13908: Initial patch KAFKA-1012

Posted by Tejas Patil <te...@apache.org>.

> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/common/ErrorMapping.scala, line 45
> > <https://reviews.apache.org/r/13908/diff/1/?file=346515#file346515line45>
> >
> >     Could this error code be renamed to something like OffsetLoadingNotCompleteCode. Arguably this will convey the error code more clearly.

Agree. Would include change in v3 patch.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 280
> > <https://reviews.apache.org/r/13908/diff/1/?file=346517#file346517line280>
> >
> >     It will be good to be specific about which channel the consumer failed to establish. In this case, let's mention "Unable to establish a channel for fetching offsets with any of the live brokers in %s".format(brokers.mkString(','))

Agree. Would include change in v3 patch.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 321
> > <https://reviews.apache.org/r/13908/diff/1/?file=346517#file346517line321>
> >
> >     Is it a good idea for commitOffsets() to eat up every error that it encounters ? commitOffsets() is a public API and users want to use it to commit offsets on demand, manually. These users do not use auto commit offsets and use commitOffsets() to checkpoint offsets as often as the application logic dictates. For that use case, if the commitOffsets() has not actually successfully committed the offsets, the user of the API must know about it and retry as required. Thoughts?

Correct me if I am wrong: the Producer API does not expose failures to outside world. In case of failures, producer would internally retry the failed messages but thats behind the hood and would not be visible to the caller. With embedded producer, I could not find a way so that consumers would about failures w/o modifying the producer code. As "embedded producer" was a temporary hack thingy, we refrained from doing modifications in producer code to expose this info. This could be something that can be handled in phase #2 ie. using OffsetCommitRequest.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 664
> > <https://reviews.apache.org/r/13908/diff/1/?file=346517#file346517line664>
> >
> >     It is probably better to be clearer on this error message as well. Something along the lines of "as offset bootstrap is still in progress on some brokers. This means leadership changed recently for the offsets topic"

This is one of the points that Guozhang raised in his review comment 27.3 and now you; strong indication that I have got to change that sloppy message :) The loading process would be triggered by (a) broker startup and (b) leadership change. I tried to capture both these things in a log message but it looked ugly as it was too big to fit in one line. Technically 'broker startup' leads to leadership assignment which can be also seen as leadership change. With that argument, if we have to not distinguish between (a) and (b), then we could go with the message you suggested. Else, change the last part in your suggestion to "This means leadership changed recently for the offsets topic or the broker is starting up".


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/KafkaServer.scala, line 88
> > <https://reviews.apache.org/r/13908/diff/1/?file=346521#file346521line88>
> >
> >     Curious - why do we need to use the singleton pattern here? Shouldn't only one thread invoke KafkaServer.startup?

Its made singleton so that even if someone carelessly tries to create multiple offset managers on same server instance, there would still be a single offset manager. I agree that in current code this will not happen. The penalty of having multiple copies offset managers is huge in terms of memory and correctness, so made it singleton.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 17
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line17>
> >
> >     this file has turned into a big blob of code. It will help if you can separate the OffsetManager trait, the DefaultOffsetManager and ZookeeperOffsetManager into separate files

Agree. Would include change in v3 patch.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 51
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line51>
> >
> >     I think it is best to not include any parameters to the startup() API as it is difficult to come up with a set of parameters that would work for all possible offset managers. What might work better is to include a generic init API that takes in a Properties object. This API initializes the context required for the offset manager. startup might or might not be useful if we add init(Properties), I'm not so sure.

> 'include a generic init API that takes in a Properties object' : 
For this, KafkaServer needs to know which offset manager type it needs to spawn, then bake the properties relevant to it and pass it to 'init'. This wont abstract things from KafkaServer and everytime we add a new offset manager, the KafkaServer code must be modified. In the current patch, things are abstracted from KafkaServer. I could not figure out a way to achieve both: (a) abstracting offset manager type from KafkaServer and (b) making the startup arguments not implementation specific. Any suggestions ?


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 70
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line70>
> >
> >     load the offsets from the logs is not generic enough. What if the offsets are stored in a database or custom flat files ?

In the current patch, whatever backend is used to store the offsets (zk or in-memory hash table or database or custom file), offsets are always logged in the logs of offsets topic. Load would serve as a way to read stuff from those logs into the offset managers' storage. This probably seems stupid as if a user just wants to use Zk for offsets, logging in the logs, maintaining replicas is an overhead. The only advantage with that is while switching across different offset manager implementations. ie. if you are using Zk based offset manager, all offsets are in the logs and Zk. Switching to inbuilt offset manager would be just a config change and broker would start populating in-memory hash table from logs once bounced. 
I had raised this point over Jira (comment #2, point (2)) but have not heard anything about it. Any suggestions ?


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 74
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line74>
> >
> >     Agree with Sriram that this could be named differently. It will also help if we describe the purpose of each of these APIs clearly. For example, if I want to store offsets in a database, how do I know why triggerLoadOffsets is required? Is it used to bootstrap some sort of offsets cache on startup ? 
> >     
> >     Also try to describe when these APIs will be invoked on the Kafka server side. That will help the user implement a specific offset manager relatively easily

If your concern is just about the naming and description then its easily fixable. Does 'syncOffsetsFromLogs' seems ok ?


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 257
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line257>
> >
> >     There seems to be a race condition that might overwrite a newer offset with a stale one. This can happen when a broker becomes a leader for some partition of the offsets topic. When this happens, partition.makeLeader() exposes the broker as the new leader. At that point, it can start taking in offset commit requests. An offset commit request can come in at the same time that triggerLoadOffsets() is being invoked for the same offsets partition. putOffset() will go through and update the offsets table with the new offset. It does not touch commitsWhileLoading since loading does not have the key in it. Then the 1st statement in triggerLoadOffsets is executed and loading gets the offsets partition added to it. It goes ahead and updates the offsets table with the old offset since commitsWhileLoading was not updated by putOffset.

The loading process is changed and in newer patch, timestamp would be stored along with offsets. With that, it becomes a easy to prevent for such overwrites.


- Tejas


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/13908/#review25904
-----------------------------------------------------------


On Aug. 30, 2013, 9:19 p.m., Tejas Patil wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/13908/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2013, 9:19 p.m.)
> 
> 
> Review request for kafka, Jay Kreps and Neha Narkhede.
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> See https://issues.apache.org/jira/browse/KAFKA-1012 for details.
> 
> 
> Diffs
> -----
> 
>   config/server.properties 7685879c2ab3d2dde1561bd34e6d9c55bc2429e3 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala a4c5623dbd48d9a0f21b87e39d63cde3604c64a0 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a34202b4a635582c74c0068f3ae1bde0a13 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e3a64204513467cef8917f501c3bc0e5b1db2e3e 
>   core/src/main/scala/kafka/producer/DefaultPartitioner.scala 37ddd55b49680262ac348f77bb029dd4e84958cb 
>   core/src/main/scala/kafka/server/KafkaApis.scala 0ec031ad9423b82ba9c8a49fe984337620392a8b 
>   core/src/main/scala/kafka/server/KafkaConfig.scala ebbbdea8ab8798c95b39be9594b5b805a0f29d29 
>   core/src/main/scala/kafka/server/KafkaServer.scala a925ae1a41fcb71f00ddf9e111172ec8a7fca749 
>   core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 73c87c663981002b52a0c4995a6ef96ca24d5ef4 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala bc415e3156810db6c41509d9eb4aed4484496eee 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/13908/diff/
> 
> 
> Testing
> -------
> 
> Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits pass
> 
> 
> Thanks,
> 
> Tejas Patil
> 
>


Re: Review Request 13908: Initial patch KAFKA-1012

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/13908/#review25904
-----------------------------------------------------------



core/src/main/scala/kafka/common/ErrorMapping.scala
<https://reviews.apache.org/r/13908/#comment50538>

    Could this error code be renamed to something like OffsetLoadingNotCompleteCode. Arguably this will convey the error code more clearly.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/13908/#comment50544>

    It will be good to be specific about which channel the consumer failed to establish. In this case, let's mention "Unable to establish a channel for fetching offsets with any of the live brokers in %s".format(brokers.mkString(','))



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/13908/#comment50545>

    Is it a good idea for commitOffsets() to eat up every error that it encounters ? commitOffsets() is a public API and users want to use it to commit offsets on demand, manually. These users do not use auto commit offsets and use commitOffsets() to checkpoint offsets as often as the application logic dictates. For that use case, if the commitOffsets() has not actually successfully committed the offsets, the user of the API must know about it and retry as required. Thoughts?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/13908/#comment50550>

    It is probably better to be clearer on this error message as well. Something along the lines of "as offset bootstrap is still in progress on some brokers. This means leadership changed recently for the offsets topic"



core/src/main/scala/kafka/server/KafkaServer.scala
<https://reviews.apache.org/r/13908/#comment50556>

    Curious - why do we need to use the singleton pattern here? Shouldn't only one thread invoke KafkaServer.startup?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50557>

    this file has turned into a big blob of code. It will help if you can separate the OffsetManager trait, the DefaultOffsetManager and ZookeeperOffsetManager into separate files



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50552>

    I think it is best to not include any parameters to the startup() API as it is difficult to come up with a set of parameters that would work for all possible offset managers. What might work better is to include a generic init API that takes in a Properties object. This API initializes the context required for the offset manager. startup might or might not be useful if we add init(Properties), I'm not so sure.  



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50555>

    load the offsets from the logs is not generic enough. What if the offsets are stored in a database or custom flat files ? 



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50554>

    Agree with Sriram that this could be named differently. It will also help if we describe the purpose of each of these APIs clearly. For example, if I want to store offsets in a database, how do I know why triggerLoadOffsets is required? Is it used to bootstrap some sort of offsets cache on startup ? 
    
    Also try to describe when these APIs will be invoked on the Kafka server side. That will help the user implement a specific offset manager relatively easily



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50577>

    There seems to be a race condition that might overwrite a newer offset with a stale one. This can happen when a broker becomes a leader for some partition of the offsets topic. When this happens, partition.makeLeader() exposes the broker as the new leader. At that point, it can start taking in offset commit requests. An offset commit request can come in at the same time that triggerLoadOffsets() is being invoked for the same offsets partition. putOffset() will go through and update the offsets table with the new offset. It does not touch commitsWhileLoading since loading does not have the key in it. Then the 1st statement in triggerLoadOffsets is executed and loading gets the offsets partition added to it. It goes ahead and updates the offsets table with the old offset since commitsWhileLoading was not updated by putOffset.


- Neha Narkhede


On Aug. 30, 2013, 9:19 p.m., Tejas Patil wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/13908/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2013, 9:19 p.m.)
> 
> 
> Review request for kafka, Jay Kreps and Neha Narkhede.
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> See https://issues.apache.org/jira/browse/KAFKA-1012 for details.
> 
> 
> Diffs
> -----
> 
>   config/server.properties 7685879c2ab3d2dde1561bd34e6d9c55bc2429e3 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala a4c5623dbd48d9a0f21b87e39d63cde3604c64a0 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a34202b4a635582c74c0068f3ae1bde0a13 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e3a64204513467cef8917f501c3bc0e5b1db2e3e 
>   core/src/main/scala/kafka/producer/DefaultPartitioner.scala 37ddd55b49680262ac348f77bb029dd4e84958cb 
>   core/src/main/scala/kafka/server/KafkaApis.scala 0ec031ad9423b82ba9c8a49fe984337620392a8b 
>   core/src/main/scala/kafka/server/KafkaConfig.scala ebbbdea8ab8798c95b39be9594b5b805a0f29d29 
>   core/src/main/scala/kafka/server/KafkaServer.scala a925ae1a41fcb71f00ddf9e111172ec8a7fca749 
>   core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 73c87c663981002b52a0c4995a6ef96ca24d5ef4 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala bc415e3156810db6c41509d9eb4aed4484496eee 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/13908/diff/
> 
> 
> Testing
> -------
> 
> Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits pass
> 
> 
> Thanks,
> 
> Tejas Patil
> 
>