You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joris Van Remoortere <jo...@gmail.com> on 2014/02/01 01:20:11 UTC

Re: Review Request 17248: Patch for KAFKA-1215

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17248/
-----------------------------------------------------------

(Updated Feb. 1, 2014, 12:20 a.m.)


Review request for kafka.


Changes
-------

Rebased to trunk:
- Made rack-id optional, default to -1
- Carry max-replication through zookeeper in order to support add-partitions

Missing:
- There is currently no warning when changing a broker's rack-id
- There is no warning during manual replica (re)assignment if the assignment is not legal under the given max-rack-replication factor


Summary (updated)
-----------------

Patch for KAFKA-1215


Bugs: KAFKA-1215
    https://issues.apache.org/jira/browse/KAFKA-1215


Repository: kafka


Description
-------

KAFKA-1226


Diffs (updated)
-----

  core/src/main/scala/kafka/admin/AdminUtils.scala a167756 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 2637586 
  core/src/main/scala/kafka/admin/TopicCommand.scala 842c110 
  core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
  core/src/main/scala/kafka/cluster/Broker.scala 9407ed2 
  core/src/main/scala/kafka/controller/KafkaController.scala a0267ae 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a 
  core/src/main/scala/kafka/server/KafkaApis.scala 29abc46 
  core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c 
  core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 
  core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b 
  core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 115e203 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b4 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 9347ea6 
  core/src/test/scala/unit/kafka/integration/FetcherTest.scala 47130d3 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 9998a11 
  core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 18e3555 
  core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 38e3ae7 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala d88b6c3 
  examples/README d33f6c5 

Diff: https://reviews.apache.org/r/17248/diff/


Testing
-------


File Attachments
----------------

rack_aware_replica_assignment_v1.patch
  https://reviews.apache.org/media/uploaded/files/2014/01/23/394cef99-f800-4d94-bc59-fdb6c68b53f5__rack_aware_replica_assignment_v1.patch


Thanks,

Joris Van Remoortere


Re: Review Request 17248: Patch for KAFKA-1215

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17248/#review35997
-----------------------------------------------------------


Some high-level comments:

1. Instead of exposing the maxRackReplica parameter throughout the calling hierarchy and also in the API function, could we just wrap it in these places:

a. assignReplicasToBrokers, the only place we need to read this parameter. This function is called by

createTopic

addPartitions

generateAssignment

All of these three have zkClient, so we can read the cluster and maxRackReplica information just there.

b. createOrUpdateTopicPartitionAssignmentPathInZK, one of the two places we need to write the value from the command line tool. Instead of specify this parameter we can treat it as part of the general topic configs (i.e. just add this value in parseTopicConfigsToBeAdded).

c. updateAssignedReplicasForPartition, the other place we need to write the value from the controller. Currently the logic never changes this value, what it does is just read the original value and write again. So we can just avoid doing so by just moving this value to the general configs, like I said in b).

2. Broker.scala is used by the producer, who do not need to know the rack information.







- Guozhang Wang


On Feb. 1, 2014, 12:20 a.m., Joris Van Remoortere wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17248/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2014, 12:20 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1215
>     https://issues.apache.org/jira/browse/KAFKA-1215
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1226
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756 
>   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 2637586 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c110 
>   core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
>   core/src/main/scala/kafka/cluster/Broker.scala 9407ed2 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a 
>   core/src/main/scala/kafka/server/KafkaApis.scala 29abc46 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c 
>   core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b 
>   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 115e203 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b4 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 9347ea6 
>   core/src/test/scala/unit/kafka/integration/FetcherTest.scala 47130d3 
>   core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 9998a11 
>   core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 18e3555 
>   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 38e3ae7 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala d88b6c3 
>   examples/README d33f6c5 
> 
> Diff: https://reviews.apache.org/r/17248/diff/
> 
> 
> Testing
> -------
> 
> 
> File Attachments
> ----------------
> 
> rack_aware_replica_assignment_v1.patch
>   https://reviews.apache.org/media/uploaded/files/2014/01/23/394cef99-f800-4d94-bc59-fdb6c68b53f5__rack_aware_replica_assignment_v1.patch
> 
> 
> Thanks,
> 
> Joris Van Remoortere
> 
>


Re: Review Request 17248: Patch for KAFKA-1215

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17248/#review35293
-----------------------------------------------------------


Sorry for the late review. The high level comment is that we need to make this change backward compatible, which means that we can do a rolling upgrade of an existing 0.8 cluster. To do that, we need to (1) consider wire protocol changes (see comments on Broker); (2) make sure that the old code can read the new ZK format. Once the following comments are addressed, could you exercise a rolling upgrade and see if there is any issue?


core/src/main/scala/kafka/admin/AdminUtils.scala
<https://reviews.apache.org/r/17248/#comment65764>

    not => Not



core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
<https://reviews.apache.org/r/17248/#comment65766>

    Shouldn't we pass in maxReplicaPerRack?



core/src/main/scala/kafka/admin/TopicCommand.scala
<https://reviews.apache.org/r/17248/#comment65768>

    After you rebase, we need to make sure this option is only available during topic creation.



core/src/main/scala/kafka/client/ClientUtils.scala
<https://reviews.apache.org/r/17248/#comment65760>

    broker list is for the clients and there is no need for them to know the rack a broker is in. host/port is enough.



core/src/main/scala/kafka/cluster/Broker.scala
<https://reviews.apache.org/r/17248/#comment65770>

    Could we preserve the current constructor w/o rack and default rack to 0?



core/src/main/scala/kafka/cluster/Broker.scala
<https://reviews.apache.org/r/17248/#comment65763>

    This actually changes the wire protocol btw the controller and the broker, which makes rolling upgrade difficult. If we really want to change the protocol, we need to do this in 2 steps. First upgrade the code w/o using the new format. Second, use the new format. So, we will need a new config option. For this particular case, I am not sure that we need to change the protocol though. We can only populate the rack info when instantiating a broker from ZK. This will make sure that we have the rack info for replica assignment. Then, we don't change the wire protocol btw the controller and the broker. All brokers instantiated through ByteBuffer will have the default rack info.



core/src/main/scala/kafka/utils/ZkUtils.scala
<https://reviews.apache.org/r/17248/#comment65769>

    Could this be integrated into getCluster()?


- Jun Rao


On Feb. 1, 2014, 12:20 a.m., Joris Van Remoortere wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17248/
> -----------------------------------------------------------
> 
> (Updated Feb. 1, 2014, 12:20 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1215
>     https://issues.apache.org/jira/browse/KAFKA-1215
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1226
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala a167756 
>   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 2637586 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 842c110 
>   core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
>   core/src/main/scala/kafka/cluster/Broker.scala 9407ed2 
>   core/src/main/scala/kafka/controller/KafkaController.scala a0267ae 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala ac4262a 
>   core/src/main/scala/kafka/server/KafkaApis.scala 29abc46 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 9dca55c 
>   core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 
>   core/src/main/scala/kafka/utils/ZkUtils.scala b42e52b 
>   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 115e203 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 59de1b4 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 8df0982 
>   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 9347ea6 
>   core/src/test/scala/unit/kafka/integration/FetcherTest.scala 47130d3 
>   core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 9998a11 
>   core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 18e3555 
>   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 38e3ae7 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala d88b6c3 
>   examples/README d33f6c5 
> 
> Diff: https://reviews.apache.org/r/17248/diff/
> 
> 
> Testing
> -------
> 
> 
> File Attachments
> ----------------
> 
> rack_aware_replica_assignment_v1.patch
>   https://reviews.apache.org/media/uploaded/files/2014/01/23/394cef99-f800-4d94-bc59-fdb6c68b53f5__rack_aware_replica_assignment_v1.patch
> 
> 
> Thanks,
> 
> Joris Van Remoortere
> 
>