You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <ja...@gmail.com> on 2015/01/21 02:18:27 UTC

New consumer plans

There is a draft patch for the new consumer up on KAFKA-1760:
  https://issues.apache.org/jira/browse/KAFKA-1760

I chatted with Guozhang earlier today and here was our thought on how to
proceed:
1. There are changes to NetworkClient  and Sender that I'll describe below.
These should be closely reviewed as (a) NetworkClient is an important
interface and we should want to get it right, and (b) these changes may
break the new producer if there is any problem with them.
2. The rest of the consumer we will do a couple rounds of high-level review
on but probably not as deep. We will check it in and the proceed to add
more system and integration tests on consumer functionality.
3. In parallel a few of the LI folks will take up the consumer co-ordinator
server-side implementation.

So right now what would be helpful would be for people to take a look at
the networkclient and sender changes. There are some annoying javadoc
auto-formatting changes which I'll try to get out of there, so ignore those
for now.

Let me try to motivate the new NetworkClient changes so people can
understand them:
1. Added a method to check the number of in-flight requests per node, it
matches the existing in-flight method but is just for one node.
2. Added a completeAll() and completeAll(node) method that blocks until all
requests (or all requests for a given node) have completed. This is added
to help implement blocking requests in the co-ordinator. There are
corresponding methods in the selector to allow muting individual
connections so that you no longer select on them.
3. Separated poll into a poll method and a send method. Previously to
initiate a new request you had to also poll, which returned responses. This
was great if you were ready to process responses, but actually these two
things are somewhat separate. Now you always initiate requests with send
and actual I/O is always done by poll(). This makes it possible to initiate
non-blocking requests without needing to process responses.
4. Added a new RequestCompletionHandler callback interface. This can
optionally be provided when you initiate a request and will be invoked on
the response when the request is complete. The rationale for this is to
make it easier to implement asynchronous processing when it is possible for
requests to be initiated from many places in the code. This makes it a lot
easier to ensure the response is always handled and also to define the
request and response in the same place.

Cheers,

-Jay

Re: New consumer plans

Posted by Guozhang Wang <wa...@gmail.com>.
I have made a pass over the patch, the changes in NetworkClient / Selector
/ Sender look good to me.

But there is one issue I found in the KafkaConsumer implementation, that
when consumer subscribe (topic-partition), it will not send a join-group
request to the coordinator. This seems to be different to the initial
design:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Originally, we want the consumer to also send join group requests to the
coordinator so that the coordinator is able to detect if some of the
consumers within the same group subscribe to topic-partition and while
others subscribe to topic, or if there are overlaps between consumers'
partition subscriptions; otherwise consumer users are responsible for such
cases and if it ever happens coordinator will only rebalance between
consumers with topic subscriptions. But the JoinGroupRequest format only
contains the topics field, which looks like a mis-match here, and now I am
a bit confused about which decision we made back then about these cases.

Another thing is that the current KafkaConsumer class is gigantic just like
the old ZookeeperConsumerConnector class, and I think it's better to be
refactored into layered modules. Will think about ways to do it.

Guozhang


On Tue, Jan 20, 2015 at 5:18 PM, Jay Kreps <ja...@gmail.com> wrote:

> There is a draft patch for the new consumer up on KAFKA-1760:
>   https://issues.apache.org/jira/browse/KAFKA-1760
>
> I chatted with Guozhang earlier today and here was our thought on how to
> proceed:
> 1. There are changes to NetworkClient  and Sender that I'll describe below.
> These should be closely reviewed as (a) NetworkClient is an important
> interface and we should want to get it right, and (b) these changes may
> break the new producer if there is any problem with them.
> 2. The rest of the consumer we will do a couple rounds of high-level review
> on but probably not as deep. We will check it in and the proceed to add
> more system and integration tests on consumer functionality.
> 3. In parallel a few of the LI folks will take up the consumer co-ordinator
> server-side implementation.
>
> So right now what would be helpful would be for people to take a look at
> the networkclient and sender changes. There are some annoying javadoc
> auto-formatting changes which I'll try to get out of there, so ignore those
> for now.
>
> Let me try to motivate the new NetworkClient changes so people can
> understand them:
> 1. Added a method to check the number of in-flight requests per node, it
> matches the existing in-flight method but is just for one node.
> 2. Added a completeAll() and completeAll(node) method that blocks until all
> requests (or all requests for a given node) have completed. This is added
> to help implement blocking requests in the co-ordinator. There are
> corresponding methods in the selector to allow muting individual
> connections so that you no longer select on them.
> 3. Separated poll into a poll method and a send method. Previously to
> initiate a new request you had to also poll, which returned responses. This
> was great if you were ready to process responses, but actually these two
> things are somewhat separate. Now you always initiate requests with send
> and actual I/O is always done by poll(). This makes it possible to initiate
> non-blocking requests without needing to process responses.
> 4. Added a new RequestCompletionHandler callback interface. This can
> optionally be provided when you initiate a request and will be invoked on
> the response when the request is complete. The rationale for this is to
> make it easier to implement asynchronous processing when it is possible for
> requests to be initiated from many places in the code. This makes it a lot
> easier to ensure the response is always handled and also to define the
> request and response in the same place.
>
> Cheers,
>
> -Jay
>



-- 
-- Guozhang