You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2015/02/01 02:38:26 UTC

Review Request 30482: Add the coordinator to server

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1633
    https://issues.apache.org/jira/browse/KAFKA-1633


Repository: kafka


Description
-------

TBD


Diffs
-----

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 30482: Add the coordinator to server

Posted by Guozhang Wang <wa...@gmail.com>.

On Feb. 1, 2015, 8:46 p.m., Guozhang Wang wrote:
> > Not sure this stuff is actually here for review...may still be a work in progress. Overall this structure of code makes a ton of sense to me. Left some minor comments.

Yes this is more of a WIP patch, but the scope of this JIRA does not include a fully implemented failure detection / rebalance logic either. As it just addes the coordinator module with simple start-up / shut-down functions, which can unblock further development to be parallelized.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70533
-----------------------------------------------------------


On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2015, 2:45 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1333
>     https://issues.apache.org/jira/browse/KAFKA-1333
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
>   core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
> 
> Diff: https://reviews.apache.org/r/30482/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 30482: Add the coordinator to server

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70533
-----------------------------------------------------------



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/30482/#comment115743>

    Can we refactor the consumerGroupRegistries into it's own class that does locking internally. I really feel these free standing locks inside big classes are an anti pattern. It is impossible to verify correctness without basically searching through all the code in the class to determine where it is used.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/30482/#comment115741>

    It would be nice to remove the handle prefix from the method names unless it has special significance. I used in KafkaApis as a way to differentiate the actual API implementations from helper methods. Seems cleaner as
      joinGroup, heartbeat, etc.



core/src/main/scala/kafka/coordinator/GroupRegistry.scala
<https://reviews.apache.org/r/30482/#comment115740>

    If these will be the strings the client specifies, might be nice to have them be simpler. E.g. "range" and "round-robin"



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/30482/#comment115738>

    "...for sending *a* produce response"



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/30482/#comment115739>

    ditto


Not sure this stuff is actually here for review...may still be a work in progress. Overall this structure of code makes a ton of sense to me. Left some minor comments.

- Jay Kreps


On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2015, 2:45 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1333
>     https://issues.apache.org/jira/browse/KAFKA-1333
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
>   core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
> 
> Diff: https://reviews.apache.org/r/30482/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 30482: Add the coordinator to server

Posted by Onur Karaman <ok...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70535
-----------------------------------------------------------



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/30482/#comment115777>

    Was it intentional to have this as a member variable?



core/src/main/scala/kafka/server/DelayedOperationKey.scala
<https://reviews.apache.org/r/30482/#comment115746>

    This gave me:
    java.util.UnknownFormatConversionException: Conversion = 'l'
    
    The docs on formatting longs are a bit misleading, but just a "%d".format(time) should work.



core/src/main/scala/kafka/server/DelayedOperationKey.scala
<https://reviews.apache.org/r/30482/#comment115744>

    This gave me java.util.IllegalFormatConversionException: d != java.lang.String
    
    consumerId is a string, so maybe "%s-%s".format(groupId, consumerId)



core/src/main/scala/kafka/server/DelayedOperationKey.scala
<https://reviews.apache.org/r/30482/#comment115745>

    groupId is already a string, so this can be simplified to:
    override def keyLabel = groupId


- Onur Karaman


On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2015, 2:45 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1333
>     https://issues.apache.org/jira/browse/KAFKA-1333
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
>   core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
> 
> Diff: https://reviews.apache.org/r/30482/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 30482: Add the coordinator to server

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review71562
-----------------------------------------------------------

Ship it!


Ship It!

- Jay Kreps


On Feb. 6, 2015, 11:02 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2015, 11:02 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1333
>     https://issues.apache.org/jira/browse/KAFKA-1333
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> dummy 2
> 
> 
> minor
> 
> 
> three new file
> 
> 
> Address Jay and Onur's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedRebalance.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 39b1651b680b2995cedfde95d74c086d9c6219ef 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
>   core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/MetadataCache.scala bf81a1ab88c14be8697b441eedbeb28fa0112643 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
>   core/src/test/resources/log4j.properties 1b7d5d8f7d5fae7d272849715714781cad05d77b 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30482/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 30482: Add the coordinator to server

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------

(Updated Feb. 6, 2015, 11:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1333
    https://issues.apache.org/jira/browse/KAFKA-1333


Repository: kafka


Description (updated)
-------

dummy 2


minor


three new file


Address Jay and Onur's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedRebalance.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/network/SocketServer.scala 39b1651b680b2995cedfde95d74c086d9c6219ef 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/MetadataCache.scala bf81a1ab88c14be8697b441eedbeb28fa0112643 
  core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
  core/src/test/resources/log4j.properties 1b7d5d8f7d5fae7d272849715714781cad05d77b 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 30482: Add the coordinator to server

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------

(Updated Feb. 1, 2015, 2:45 a.m.)


Review request for kafka.


Bugs: KAFKA-1333
    https://issues.apache.org/jira/browse/KAFKA-1333


Repository: kafka


Description
-------

1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
3. Add a delayed rebalance purgatory for preparing rebalance.
4. Add a join-group purgatory for sending back responses with assigned partitions.
5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.


Diffs
-----

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 30482: Add the coordinator to server

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------

(Updated Feb. 1, 2015, 1:47 a.m.)


Review request for kafka.


Bugs: KAFKA-1333 and KAFKA-1633
    https://issues.apache.org/jira/browse/KAFKA-1333
    https://issues.apache.org/jira/browse/KAFKA-1633


Repository: kafka


Description (updated)
-------

1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
3. Add a delayed rebalance purgatory for preparing rebalance.
4. Add a join-group purgatory for sending back responses with assigned partitions.
5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.


Diffs
-----

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 30482: Add the coordinator to server

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------

(Updated Feb. 1, 2015, 1:40 a.m.)


Review request for kafka.


Bugs: KAFKA-1333 and KAFKA-1633
    https://issues.apache.org/jira/browse/KAFKA-1333
    https://issues.apache.org/jira/browse/KAFKA-1633


Repository: kafka


Description
-------

TBD


Diffs (updated)
-----

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
-------


Thanks,

Guozhang Wang