You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2012/05/07 19:16:57 UTC

[jira] [Created] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Jun Rao created KAFKA-336:
-----------------------------

             Summary: add an admin RPC to communicate state changes between the controller and the broker
                 Key: KAFKA-336
                 URL: https://issues.apache.org/jira/browse/KAFKA-336
             Project: Kafka
          Issue Type: Sub-task
          Components: core
    Affects Versions: 0.8
            Reporter: Jun Rao
            Assignee: Yang Ye


Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Yang Ye (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Ye updated KAFKA-336:
--------------------------

    Attachment: controller_broker_RPC.patch
    
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13281133#comment-13281133 ] 

Jun Rao commented on KAFKA-336:
-------------------------------

Thanks for the new patch. Some comments:

>From previous review:
#2.2. from my comment and #4 from Jay's comment are still not addressed.

New review comments:

11. It seems that we haven't really documented the exact format of the new requests and the responses. I documented them in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 . Does it make sense? If so, this would require some changes in this patch.

12. KafkaZookeeper, KafkaApis: For this patch, we can just add a TODO comment in the handler in KafkaApis) and  simply send a dummy response (with eror_code = 0). We can remove the code in KafkaZookeeper since the handler probably shouldn't in KafkaZookeeper when it's actually put in.

13. Are the changes in Partition and Replica still necessary if #12 is resolved?

For Jay's comment, for now, it seems that we just need to add a couple of new types of request. So, we can probably start with customized request objects.


                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13276885#comment-13276885 ] 

Jay Kreps commented on KAFKA-336:
---------------------------------

Never mind the KafkaConnection comment we already have a BlockingChannel which is essentially the same thing.
                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Yang Ye (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Ye updated KAFKA-336:
--------------------------

    Status: Patch Available  (was: Open)

Controller to Broker RPC added with unit test
                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Yang Ye (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Ye updated KAFKA-336:
--------------------------

    Attachment: rpc.patch.v2
    
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-336:
--------------------------

    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

Fixed as part of kafka-349
                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc-v3.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Yang Ye (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Ye updated KAFKA-336:
--------------------------

    Attachment: rpc-v3.patch
    
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc-v3.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13281729#comment-13281729 ] 

Jun Rao commented on KAFKA-336:
-------------------------------

Updated the new request/response format proposal in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3, adding version_id, timeout, etc.
                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13276377#comment-13276377 ] 

Jun Rao commented on KAFKA-336:
-------------------------------

Thanks for the patch. Some comments:

1. LeaderAndISRRequest
1.1 isInit just needs 1 byte.
1.2 remove the commented out code

2. PartitionLeaderAndISRRequest
2.1 zkVersion should be long.
2.2 how about we name the class LeaderAndISR?
2.3 remove unused imports

3. ControllerToBorkerRequestTest: remove unused imports

4. We will need a client wrapper like SimpleConsumer so that the controller can use to send the commands through RPC.

5. On the server side, we need to add a new handler in KafkaApis that can handle the new requests. We also need to create new response classes for those commands. For this jira, the server can just send a dummy response.

                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13283584#comment-13283584 ] 

Jun Rao commented on KAFKA-336:
-------------------------------

Thanks for patch V3. Some more comments.

21. LeaderAndISRRequest:
21.1 There is no need to put requestTypeId in the constructor. We know the constant of the request Id.
21.2 There is no need to put requestId in write(). This will be done in BoundedByteBufferSend. See how a FetchRequest is serialized and sent to socket.
21.3 The above comments apply to StopReplicaRequest too.
21.4 In constructor, leaderAndISRInfos should be just a Map, not a mutable.HashMap.
21.5 To be consistent, let's use leaderEpoc, instead of leaderGenId (see the updated protocol in the wiki).

22. LeaderAndISRResponse:
22.1 The patch doesn't follow the protocol design in the wiki. Is there a reason?

23. Currently, for each type of Response, we have a customized ResponseSend object. Those ResponseSend objects all share the same pattern: writing a header and then the content. It would be good if we can consolidate those ResponseSend objects. The only exception is the response for Fetch request, which uses the sendfile API and has to be customized.

24. KafkaApi:
The response shouldn't return dummyTopic. Instead, it should just return NoError for each (topic,partition) in the request.

25. Unit test:
Let's add a unit test that tests the end-to-end RPC protocol for the new requests, similar to BackwardsCompatibilityTest.testProtocolVersion0().

In the new patch, it would be great if you can respond to each of the review comments (whether you agree and have fixed it, or you have a different idea, etc).

                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc-v3.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13276873#comment-13276873 ] 

Jay Kreps commented on KAFKA-336:
---------------------------------

Awesome. Couple of thoughts:

1. It would be good to seperate the tests into a serialization test that just serializes the request and deserializes it and a seperate test that actually sends to the server. We have found having the simple in-process test helpful since it is easier to debug.

2. Ideally we should also add a compatability test that saves out some binary data to ensure we don't break compatability in the future by accident. Neha has details on this. We could wait and do this later when we have everything working in case we need to make changes.

3. Broker is spelled as Borker

4. Can you change HashMap[Tuple2[String, Int] in the method signature to Map[(String,Int)], means the same but is a little better style.

I recommend we not create another SimpleConsumer wrapper, but instead make a base KafkaConnection that takes any API object, and use that to implement all the clients. These internal things then won't need a special wrapper they can just directly use the KafkaConnection. This also means that in the future if the consumer needs to send a produce request (either for offset storage or audit trail) it doesn't need a separate tcp connection.

One meta comment: these precisely versioned, compact, typed request objects are okay for public service apis because we should make them so rarely and the really important thing is compatability and efficiency. For these internal broker-to-broker apis should we instead just create a generic BrokerCommandRequest/Response which just holds a JSON string? This might be easier to evolve. I think scala has some (fairly junky) json parser included. This might be easier to evolve. Or maybe having this inconsistency is not worth it. It really depends on how many of these we think we will have.
                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-336) add an admin RPC to communicate state changes between the controller and the broker

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13281737#comment-13281737 ] 

Jun Rao commented on KAFKA-336:
-------------------------------

Another comment:

14. To facilitate a controller to send commands to individual brokers, it would useful to have some kind of ControllerCommandManager that has the following API:
ControllerCommandManager {

    sendCommand(commandRequest: Request, brokerId: Int)
}

Under the cover, the ControllerCommandManager can potentially maintain a queue per broker and have a separate send thread that gets command from the queue, sends the request using a BlockingChannel to the right broker and deserializes and checks the response.
                
> add an admin RPC to communicate state changes between the controller and the broker
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-336
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>             Fix For: 0.8
>
>         Attachments: controller_broker_RPC.patch, rpc.patch.v2
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Based on the discussion in https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 , it's more efficient to communicate state change commands between the controller and the broker using a direct RPC than via ZK. This ticket will be implementing an admin RPC client for the controller to send state change commands.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira