You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2015/06/09 01:11:48 UTC

Review Request 35231: Fix KAFKA-1740

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-2208
    https://issues.apache.org/jira/browse/KAFKA-2208


Repository: kafka


Description
-------

Move offset manager to coordinator, add validation logic for offset commit and fetch


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
  clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
  core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
  core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
  core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
  core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
  core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
  core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
  core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
  core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 

Diff: https://reviews.apache.org/r/35231/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Guozhang Wang <wa...@gmail.com>.

> On June 11, 2015, 5:40 p.m., Onur Karaman wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     This can happen in two ways:
> >     1. An automatic group management (subscribes to topics) consumer that sends an OffsetCommitRequest whose groupId hashes to the coordinator but hasn't first done a join group.
> >     2. A manual group management (subscribes to partitions) consumer that sends an OffsetCommitRequest whose groupId hashes to the coordinator.
> >     
> >     Should these be distinguishable? We can do this with an added flag in OffsetCommitRequest.

I think 1) will not happen in the Java consumer:

If a consumer has not call subscribe() ever, then commit() should fail immediately.
If a consumer has call subscribe(topics), then commit() should cause it to first join group, get the partition / starting offsets, and then commit.

But I agree that any other implementations can send a OffsetCommitRequest before join. In this case I think the coordinator should blindly return the results.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review87583
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Fix KAFKA-1740

Posted by Onur Karaman <ok...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review87583
-----------------------------------------------------------



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/35231/#comment139973>

    This can happen in two ways:
    1. An automatic group management (subscribes to topics) consumer that sends an OffsetCommitRequest whose groupId hashes to the coordinator but hasn't first done a join group.
    2. A manual group management (subscribes to partitions) consumer that sends an OffsetCommitRequest whose groupId hashes to the coordinator.
    
    Should these be distinguishable? We can do this with an added flag in OffsetCommitRequest.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/35231/#comment139977>

    Same as the earlier comment but for OffsetFetchRequests.
    
    This can happen in two ways:
    1. An automatic group management (subscribes to topics) consumer that sends an OffsetFetchRequest whose groupId hashes to the coordinator but hasn't first done a join group.
    2. A manual group management (subscribes to partitions) consumer that sends an OffsetFetchRequest whose groupId hashes to the coordinator.
    
    Should these be distinguishable? We can do this with an added flag in OffsetFetchRequest.


- Onur Karaman


On June 8, 2015, 11:12 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 8, 2015, 11:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Move offset manager to coordinator, add validation logic for offset commit and fetch
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Jason Gustafson <ja...@confluent.io>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review90155
-----------------------------------------------------------

Ship it!


Ship It!

- Jason Gustafson


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/
-----------------------------------------------------------

(Updated June 30, 2015, 1:44 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Address Onur and Jason's comments


Bugs: KAFKA-1740
    https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description (updated)
-------

v2


minor


coordinator response test


comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
  clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
  core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
  core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
  core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
  core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
  core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
  core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
  core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
  core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
  core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
  core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
  core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 

Diff: https://reviews.apache.org/r/35231/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 35231: Rebased

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/
-----------------------------------------------------------

(Updated June 30, 2015, 1:21 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Rebased


Bugs: KAFKA-1740
    https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description (updated)
-------

v2


minor


coordinator response test


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
  clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
  core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
  core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
  core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
  core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
  core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
  core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
  core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
  core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
  core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
  core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
  core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 

Diff: https://reviews.apache.org/r/35231/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 35231: Fix KAFKA-1740

Posted by Onur Karaman <ok...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review87107
-----------------------------------------------------------


I'll try to do a more detailed review soon. I just want to get this one comment out of the way.


core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
<https://reviews.apache.org/r/35231/#comment139433>

    I don't think the ConsumerCoordinator constructor should change.
    
    The coordinator just needs a KafkaConfig, ZkClient, and OffsetManager in order to work. It shouldn't care about ReplicaManagers, KakfaSchedulers, or OffsetManagerConfigs.
    
    Passing in these lower-level dependencies makes testing the ConsumerCoordinator harder. As an example, think about how https://reviews.apache.org/r/35086/diff/ would work with this change. You'd have to mock out dependencies(ReplicaManager) that are a level deeper than what you actually care about(OffsetManager) with the hope that OffsetManager would react nicely to your mock instead of directly mocking the OffsetManager itself.


- Onur Karaman


On June 8, 2015, 11:12 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 8, 2015, 11:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Move offset manager to coordinator, add validation logic for offset commit and fetch
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Guozhang Wang <wa...@gmail.com>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.
> 
> Jason Gustafson wrote:
>     I wonder if it is worthwhile checking that those values are set accordingly? If the generationId were 5, for example, would we want to just commit the offsets blindly? Or would we throw an error?
> 
> Guozhang Wang wrote:
>     As Onur mentioned, when group == null it is also possible that the group has not been created on the coordinator (when coordinator migrated, for example), and in this case the consumerId / generationId would not be ""/-1.
> 
> Jason Gustafson wrote:
>     That makes sense. I was just thinking this might open the door to having commits from old or invalid generations go through. Unless we store group metadata in zookeeper though, perhaps there is no way to prevent it.
> 
> Onur Karaman wrote:
>     So I've been meaning to ask something similar.
>     
>     Guozhang: offline we talked about all offset logic validating generation id before attempting to perform the action. To adjust for this proposed check, at one point we talked about making ConsumerCoordinator more strictly follow the wiki and have the generation id bump happen at the end of rebalance instead of at the beginning so that consumers would be able to commit offsets prior to rebalancing. Given that this rb is about merging in the OffsetManager, should those checks be added here or in a later rb?
> 
> Onur Karaman wrote:
>     My bad. I missed your generation id check in handleCommitOffsets. But I'm still curious about the generation id bump placement with respect to committing offsets before providing a JoinGroupRequest.

That is a good point. I think we should postpone the generation id bump from prepareRebalance() to rebalance(), before the line of group.transitionTo(Rebalancing). Does that sound right to you?


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Jason Gustafson <ja...@confluent.io>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.

I wonder if it is worthwhile checking that those values are set accordingly? If the generationId were 5, for example, would we want to just commit the offsets blindly? Or would we throw an error?


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Onur Karaman <ok...@linkedin.com>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.
> 
> Jason Gustafson wrote:
>     I wonder if it is worthwhile checking that those values are set accordingly? If the generationId were 5, for example, would we want to just commit the offsets blindly? Or would we throw an error?
> 
> Guozhang Wang wrote:
>     As Onur mentioned, when group == null it is also possible that the group has not been created on the coordinator (when coordinator migrated, for example), and in this case the consumerId / generationId would not be ""/-1.
> 
> Jason Gustafson wrote:
>     That makes sense. I was just thinking this might open the door to having commits from old or invalid generations go through. Unless we store group metadata in zookeeper though, perhaps there is no way to prevent it.
> 
> Onur Karaman wrote:
>     So I've been meaning to ask something similar.
>     
>     Guozhang: offline we talked about all offset logic validating generation id before attempting to perform the action. To adjust for this proposed check, at one point we talked about making ConsumerCoordinator more strictly follow the wiki and have the generation id bump happen at the end of rebalance instead of at the beginning so that consumers would be able to commit offsets prior to rebalancing. Given that this rb is about merging in the OffsetManager, should those checks be added here or in a later rb?

My bad. I missed your generation id check in handleCommitOffsets. But I'm still curious about the generation id bump placement with respect to committing offsets before providing a JoinGroupRequest.


- Onur


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Onur Karaman <ok...@linkedin.com>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.
> 
> Jason Gustafson wrote:
>     I wonder if it is worthwhile checking that those values are set accordingly? If the generationId were 5, for example, would we want to just commit the offsets blindly? Or would we throw an error?
> 
> Guozhang Wang wrote:
>     As Onur mentioned, when group == null it is also possible that the group has not been created on the coordinator (when coordinator migrated, for example), and in this case the consumerId / generationId would not be ""/-1.
> 
> Jason Gustafson wrote:
>     That makes sense. I was just thinking this might open the door to having commits from old or invalid generations go through. Unless we store group metadata in zookeeper though, perhaps there is no way to prevent it.

So I've been meaning to ask something similar.

Guozhang: offline we talked about all offset logic validating generation id before attempting to perform the action. To adjust for this proposed check, at one point we talked about making ConsumerCoordinator more strictly follow the wiki and have the generation id bump happen at the end of rebalance instead of at the beginning so that consumers would be able to commit offsets prior to rebalancing. Given that this rb is about merging in the OffsetManager, should those checks be added here or in a later rb?


- Onur


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Jason Gustafson <ja...@confluent.io>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.
> 
> Jason Gustafson wrote:
>     I wonder if it is worthwhile checking that those values are set accordingly? If the generationId were 5, for example, would we want to just commit the offsets blindly? Or would we throw an error?
> 
> Guozhang Wang wrote:
>     As Onur mentioned, when group == null it is also possible that the group has not been created on the coordinator (when coordinator migrated, for example), and in this case the consumerId / generationId would not be ""/-1.

That makes sense. I was just thinking this might open the door to having commits from old or invalid generations go through. Unless we store group metadata in zookeeper though, perhaps there is no way to prevent it.


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Guozhang Wang <wa...@gmail.com>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?

In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Address Onur and Jason's comments

Posted by Guozhang Wang <wa...@gmail.com>.

> On June 18, 2015, 12:50 a.m., Jason Gustafson wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 229-236
> > <https://reviews.apache.org/r/35231/diff/1/?file=980918#file980918line229>
> >
> >     In manual group management, would we expect consumerId and generationId to be null?
> 
> Guozhang Wang wrote:
>     In that case, the consumerId would be UNKNOWN_CONSUMER_ID = "", and the generationId would be -1. These two values are only used inside OffsetManager.storeOffsets for logging.
> 
> Jason Gustafson wrote:
>     I wonder if it is worthwhile checking that those values are set accordingly? If the generationId were 5, for example, would we want to just commit the offsets blindly? Or would we throw an error?

As Onur mentioned, when group == null it is also possible that the group has not been created on the coordinator (when coordinator migrated, for example), and in this case the consumerId / generationId would not be ""/-1.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------


On June 30, 2015, 1:44 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 30, 2015, 1:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> v2
> 
> 
> minor
> 
> 
> coordinator response test
> 
> 
> comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 6c26667182d7fa8153469a634881a7c34d8a0c91 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b5e8a0ff0aaf9df382b91f93e7ad51b1ed5f6209 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 613b192ba84b66f79b45f3cd70418c3f503bee9e 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 0990938b33ba7f3bccf373325dbbaee5e45ba8bb 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala a385adbd7cb6ed693957df571d175ec36b8eaf94 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 0cd5605bcca78dd8a80e8586e58f8b2529da97d7 
>   core/src/main/scala/kafka/server/KafkaApis.scala ad6f05807c61c971e5e60d24bc0c87e989115961 
>   core/src/main/scala/kafka/server/KafkaServer.scala 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala a44fbd653b53649368db2656c3be3e14e3455163 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Fix KAFKA-1740

Posted by Jason Gustafson <ja...@confluent.io>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/#review88301
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 558)
<https://reviews.apache.org/r/35231/#comment140747>

    Did you intend to actually rethrow or just log?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (lines 229 - 236)
<https://reviews.apache.org/r/35231/#comment140772>

    In manual group management, would we expect consumerId and generationId to be null?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 280)
<https://reviews.apache.org/r/35231/#comment140755>

    This looks like the wrong ticket.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 285)
<https://reviews.apache.org/r/35231/#comment140756>

    Same as above, wrong ticket.


- Jason Gustafson


On June 8, 2015, 11:12 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/35231/
> -----------------------------------------------------------
> 
> (Updated June 8, 2015, 11:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1740
>     https://issues.apache.org/jira/browse/KAFKA-1740
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Move offset manager to coordinator, add validation logic for offset commit and fetch
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 
>   clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
>   core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
>   core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
>   core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 
> 
> Diff: https://reviews.apache.org/r/35231/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 35231: Fix KAFKA-1740

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35231/
-----------------------------------------------------------

(Updated June 8, 2015, 11:12 p.m.)


Review request for kafka.


Bugs: KAFKA-1740
    https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description
-------

Move offset manager to coordinator, add validation logic for offset commit and fetch


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 
  clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 
  clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b 
  core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 
  core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
  core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 
  core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad 
  core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f 
  core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 
  core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 
  core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d 

Diff: https://reviews.apache.org/r/35231/diff/


Testing
-------


Thanks,

Guozhang Wang