You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2015/02/01 02:38:26 UTC
Review Request 30482: Add the coordinator to server
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------
Review request for kafka.
Bugs: KAFKA-1633
https://issues.apache.org/jira/browse/KAFKA-1633
Repository: kafka
Description
-------
TBD
Diffs
-----
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
Diff: https://reviews.apache.org/r/30482/diff/
Testing
-------
Thanks,
Guozhang Wang
Re: Review Request 30482: Add the coordinator to server
Posted by Guozhang Wang <wa...@gmail.com>.
On Feb. 1, 2015, 8:46 p.m., Guozhang Wang wrote:
> > Not sure this stuff is actually here for review...may still be a work in progress. Overall this structure of code makes a ton of sense to me. Left some minor comments.
Yes this is more of a WIP patch, but the scope of this JIRA does not include a fully implemented failure detection / rebalance logic either. As it just addes the coordinator module with simple start-up / shut-down functions, which can unblock further development to be parallelized.
- Guozhang
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70533
-----------------------------------------------------------
On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
>
> (Updated Feb. 1, 2015, 2:45 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1333
> https://issues.apache.org/jira/browse/KAFKA-1333
>
>
> Repository: kafka
>
>
> Description
> -------
>
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
> core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
> core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
> core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
>
> Diff: https://reviews.apache.org/r/30482/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Guozhang Wang
>
>
Re: Review Request 30482: Add the coordinator to server
Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70533
-----------------------------------------------------------
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/30482/#comment115743>
Can we refactor the consumerGroupRegistries into it's own class that does locking internally. I really feel these free standing locks inside big classes are an anti pattern. It is impossible to verify correctness without basically searching through all the code in the class to determine where it is used.
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/30482/#comment115741>
It would be nice to remove the handle prefix from the method names unless it has special significance. I used in KafkaApis as a way to differentiate the actual API implementations from helper methods. Seems cleaner as
joinGroup, heartbeat, etc.
core/src/main/scala/kafka/coordinator/GroupRegistry.scala
<https://reviews.apache.org/r/30482/#comment115740>
If these will be the strings the client specifies, might be nice to have them be simpler. E.g. "range" and "round-robin"
core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/30482/#comment115738>
"...for sending *a* produce response"
core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/30482/#comment115739>
ditto
Not sure this stuff is actually here for review...may still be a work in progress. Overall this structure of code makes a ton of sense to me. Left some minor comments.
- Jay Kreps
On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
>
> (Updated Feb. 1, 2015, 2:45 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1333
> https://issues.apache.org/jira/browse/KAFKA-1333
>
>
> Repository: kafka
>
>
> Description
> -------
>
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
> core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
> core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
> core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
>
> Diff: https://reviews.apache.org/r/30482/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Guozhang Wang
>
>
Re: Review Request 30482: Add the coordinator to server
Posted by Onur Karaman <ok...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70535
-----------------------------------------------------------
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/30482/#comment115777>
Was it intentional to have this as a member variable?
core/src/main/scala/kafka/server/DelayedOperationKey.scala
<https://reviews.apache.org/r/30482/#comment115746>
This gave me:
java.util.UnknownFormatConversionException: Conversion = 'l'
The docs on formatting longs are a bit misleading, but just a "%d".format(time) should work.
core/src/main/scala/kafka/server/DelayedOperationKey.scala
<https://reviews.apache.org/r/30482/#comment115744>
This gave me java.util.IllegalFormatConversionException: d != java.lang.String
consumerId is a string, so maybe "%s-%s".format(groupId, consumerId)
core/src/main/scala/kafka/server/DelayedOperationKey.scala
<https://reviews.apache.org/r/30482/#comment115745>
groupId is already a string, so this can be simplified to:
override def keyLabel = groupId
- Onur Karaman
On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
>
> (Updated Feb. 1, 2015, 2:45 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1333
> https://issues.apache.org/jira/browse/KAFKA-1333
>
>
> Repository: kafka
>
>
> Description
> -------
>
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
> core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
> core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
> core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
>
> Diff: https://reviews.apache.org/r/30482/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Guozhang Wang
>
>
Re: Review Request 30482: Add the coordinator to server
Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review71562
-----------------------------------------------------------
Ship it!
Ship It!
- Jay Kreps
On Feb. 6, 2015, 11:02 p.m., Guozhang Wang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> -----------------------------------------------------------
>
> (Updated Feb. 6, 2015, 11:02 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1333
> https://issues.apache.org/jira/browse/KAFKA-1333
>
>
> Repository: kafka
>
>
> Description
> -------
>
> dummy 2
>
>
> minor
>
>
> three new file
>
>
> Address Jay and Onur's comments
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/DelayedRebalance.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
> core/src/main/scala/kafka/network/SocketServer.scala 39b1651b680b2995cedfde95d74c086d9c6219ef
> core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
> core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
> core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
> core/src/main/scala/kafka/server/MetadataCache.scala bf81a1ab88c14be8697b441eedbeb28fa0112643
> core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a
> core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00
> core/src/test/resources/log4j.properties 1b7d5d8f7d5fae7d272849715714781cad05d77b
> core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab
> core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5
> core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e
>
> Diff: https://reviews.apache.org/r/30482/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Guozhang Wang
>
>
Re: Review Request 30482: Add the coordinator to server
Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------
(Updated Feb. 6, 2015, 11:02 p.m.)
Review request for kafka.
Bugs: KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1333
Repository: kafka
Description (updated)
-------
dummy 2
minor
three new file
Address Jay and Onur's comments
Diffs (updated)
-----
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/DelayedRebalance.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
core/src/main/scala/kafka/network/SocketServer.scala 39b1651b680b2995cedfde95d74c086d9c6219ef
core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
core/src/main/scala/kafka/server/MetadataCache.scala bf81a1ab88c14be8697b441eedbeb28fa0112643
core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a
core/src/main/scala/kafka/tools/MirrorMaker.scala 81ae205ef7b2050d0152f29f8da7dd91b17b8b00
core/src/test/resources/log4j.properties 1b7d5d8f7d5fae7d272849715714781cad05d77b
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5
core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e
Diff: https://reviews.apache.org/r/30482/diff/
Testing
-------
Thanks,
Guozhang Wang
Re: Review Request 30482: Add the coordinator to server
Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------
(Updated Feb. 1, 2015, 2:45 a.m.)
Review request for kafka.
Bugs: KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1333
Repository: kafka
Description
-------
1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
3. Add a delayed rebalance purgatory for preparing rebalance.
4. Add a join-group purgatory for sending back responses with assigned partitions.
5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
Diffs
-----
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
Diff: https://reviews.apache.org/r/30482/diff/
Testing
-------
Thanks,
Guozhang Wang
Re: Review Request 30482: Add the coordinator to server
Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------
(Updated Feb. 1, 2015, 1:47 a.m.)
Review request for kafka.
Bugs: KAFKA-1333 and KAFKA-1633
https://issues.apache.org/jira/browse/KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1633
Repository: kafka
Description (updated)
-------
1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, and ZK listeners.
2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire heartbeat requests.
3. Add a delayed rebalance purgatory for preparing rebalance.
4. Add a join-group purgatory for sending back responses with assigned partitions.
5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / join-group / rebalance purgatories.
6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with coordinator, and sending reponses via callbacks.
Diffs
-----
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
Diff: https://reviews.apache.org/r/30482/diff/
Testing
-------
Thanks,
Guozhang Wang
Re: Review Request 30482: Add the coordinator to server
Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
-----------------------------------------------------------
(Updated Feb. 1, 2015, 1:40 a.m.)
Review request for kafka.
Bugs: KAFKA-1333 and KAFKA-1633
https://issues.apache.org/jira/browse/KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1633
Repository: kafka
Description
-------
TBD
Diffs (updated)
-----
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION
core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION
core/src/main/scala/kafka/server/DelayedOperationKey.scala fb7e9ed5c16dd15b71e1b1ac12948641185871db
core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290
core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4
Diff: https://reviews.apache.org/r/30482/diff/
Testing
-------
Thanks,
Guozhang Wang