You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Javed, Haseeb" <ja...@buckeyemail.osu.edu> on 2017/09/29 23:49:09 UTC

How is CorrelationId used for matching request and response

The Kafka protocol guide mentions that each request and response contains a correlationId which is a user-supplied integer to match requests and corresponding responses. However, when I look at the code in the class AbstractResponse, we have a method defined as following:


public Send toSend(String destination, RequestHeader requestHeader) {
    return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader());
}

So basically we are just using the requestHeader to generate the responseHeader so doesn't this pretty much guarantees that the correlationId for the Request and the Response would always be the same, or am I missing something?



Re: How is CorrelationId used for matching request and response

Posted by "Javed, Haseeb" <ja...@buckeyemail.osu.edu>.
I looked in to the code for NetworkClient.java and it all makes sense now. The client fetches the last request sent out the particular server using InFlightRequest.completeNext(String source). The correlationId from this request must match with the correlationId in the response header for them to be in order.


Thanks all for the help.

Haseeb

________________________________
From: Javed, Haseeb <ja...@buckeyemail.osu.edu>
Sent: Sunday, October 1, 2017 6:39:13 AM
To: dev@kafka.apache.org
Subject: Re: How is CorrelationId used for matching request and response

Thanks all for reaching out.



Ted - I am looking at the 0.11.0 release. Particularly here https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
[https://avatars3.githubusercontent.com/u/47359?v=4&s=400]<https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java>

apache/kafka<https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java>
github.com
kafka - Mirror of Apache Kafka




In this release, the Server uses the following method in almost all cases (ApiVersionResponses.unsupportedVersionSend(...) being the only exception)

public Send toSend(String destination, RequestHeader requestHeader) {
    return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader());
}


Jay - I understand the purpose of correlationId but what I don't understand is how the request/response matching logic is being implemented. From the code, I see that server always uses the request header to generate the response header so in effect both request and response headers end up having the same correlationId. There seems to be no situation where response and request could possible have different correlationIds.
<https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java>


Haseeb

________________________________
From: Jay Kreps <ja...@confluent.io>
Sent: Saturday, September 30, 2017 11:43:30 PM
To: dev@kafka.apache.org
Subject: Re: How is CorrelationId used for matching request and response

Yes the idea of the correlation id is to make it easier for the client to
match a particular response to the request it answers. Kafka’s protocol
allows sending multiple requests without waiting for the response. In
theory you can just rely on ordering, but that can be a bit fragile if the
client has any kind of bug. So this id is an explicit check—a response with
id 42 is the answer to the request you sent with id 42. Hope that helps!

-Jay

On Fri, Sep 29, 2017 at 4:52 PM Ted Yu <yu...@gmail.com> wrote:

> Which release / version are you looking at ?
> In trunk branch, I only see one toSend():
>
>     protected Send toSend(String destination, ResponseHeader header, short
> apiVersion) {
>
>         return new NetworkSend(destination, serialize(apiVersion, header));
>
> On Fri, Sep 29, 2017 at 4:49 PM, Javed, Haseeb <
> javed.19@buckeyemail.osu.edu
> > wrote:
>
> > The Kafka protocol guide mentions that each request and response contains
> > a correlationId which is a user-supplied integer to match requests and
> > corresponding responses. However, when I look at the code in the class
> > AbstractResponse, we have a method defined as following:
> >
> >
> > public Send toSend(String destination, RequestHeader requestHeader) {
> >     return toSend(destination, requestHeader.apiVersion(), requestHeader.
> > toResponseHeader());
> > }
> >
> > So basically we are just using the requestHeader to generate the
> > responseHeader so doesn't this pretty much guarantees that the
> > correlationId for the Request and the Response would always be the same,
> or
> > am I missing something?
> >
> >
> >
>

Re: How is CorrelationId used for matching request and response

Posted by Ismael Juma <is...@juma.me.uk>.
Hi Haseeb,

That is the point, the server should always send a response with the same
correlation id as the request it has received. If there's a bug in the
networking layer where a response is never sent back or there is reordering
somewhere in the stack, then this will be identified by the client when the
correlation id does not match. Does this help?

Ismael

On Sun, Oct 1, 2017 at 2:39 AM, Javed, Haseeb <ja...@buckeyemail.osu.edu>
wrote:

> Thanks all for reaching out.
>
>
>
> Ted - I am looking at the 0.11.0 release. Particularly here
> https://github.com/apache/kafka/blob/0.11.0/clients/src/
> main/java/org/apache/kafka/common/requests/AbstractResponse.java
>
> In this release, the Server uses the following method in almost all cases
> (ApiVersionResponses.unsupportedVersionSend(...) being the only exception)
>
> public Send toSend(String destination, RequestHeader requestHeader) {
>     return toSend(destination, requestHeader.apiVersion(), requestHeader.
> toResponseHeader());
> }
>
>
> Jay - I understand the purpose of correlationId but what I don't
> understand is how the request/response matching logic is being implemented.
> From the code, I see that server always uses the request header to generate
> the response header so in effect both request and response headers end up
> having the same correlationId. There seems to be no situation where
> response and request could possible have different correlationIds.
> <https://github.com/apache/kafka/blob/0.11.0/clients/src/
> main/java/org/apache/kafka/common/requests/AbstractResponse.java>
>
>
> Haseeb
>
> ________________________________
> From: Jay Kreps <ja...@confluent.io>
> Sent: Saturday, September 30, 2017 11:43:30 PM
> To: dev@kafka.apache.org
> Subject: Re: How is CorrelationId used for matching request and response
>
> Yes the idea of the correlation id is to make it easier for the client to
> match a particular response to the request it answers. Kafka’s protocol
> allows sending multiple requests without waiting for the response. In
> theory you can just rely on ordering, but that can be a bit fragile if the
> client has any kind of bug. So this id is an explicit check—a response with
> id 42 is the answer to the request you sent with id 42. Hope that helps!
>
> -Jay
>
> On Fri, Sep 29, 2017 at 4:52 PM Ted Yu <yu...@gmail.com> wrote:
>
> > Which release / version are you looking at ?
> > In trunk branch, I only see one toSend():
> >
> >     protected Send toSend(String destination, ResponseHeader header,
> short
> > apiVersion) {
> >
> >         return new NetworkSend(destination, serialize(apiVersion,
> header));
> >
> > On Fri, Sep 29, 2017 at 4:49 PM, Javed, Haseeb <
> > javed.19@buckeyemail.osu.edu
> > > wrote:
> >
> > > The Kafka protocol guide mentions that each request and response
> contains
> > > a correlationId which is a user-supplied integer to match requests and
> > > corresponding responses. However, when I look at the code in the class
> > > AbstractResponse, we have a method defined as following:
> > >
> > >
> > > public Send toSend(String destination, RequestHeader requestHeader) {
> > >     return toSend(destination, requestHeader.apiVersion(),
> requestHeader.
> > > toResponseHeader());
> > > }
> > >
> > > So basically we are just using the requestHeader to generate the
> > > responseHeader so doesn't this pretty much guarantees that the
> > > correlationId for the Request and the Response would always be the
> same,
> > or
> > > am I missing something?
> > >
> > >
> > >
> >
>

Re: How is CorrelationId used for matching request and response

Posted by "Javed, Haseeb" <ja...@buckeyemail.osu.edu>.
Thanks all for reaching out.



Ted - I am looking at the 0.11.0 release. Particularly here https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

In this release, the Server uses the following method in almost all cases (ApiVersionResponses.unsupportedVersionSend(...) being the only exception)

public Send toSend(String destination, RequestHeader requestHeader) {
    return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader());
}


Jay - I understand the purpose of correlationId but what I don't understand is how the request/response matching logic is being implemented. From the code, I see that server always uses the request header to generate the response header so in effect both request and response headers end up having the same correlationId. There seems to be no situation where response and request could possible have different correlationIds.
<https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java>


Haseeb

________________________________
From: Jay Kreps <ja...@confluent.io>
Sent: Saturday, September 30, 2017 11:43:30 PM
To: dev@kafka.apache.org
Subject: Re: How is CorrelationId used for matching request and response

Yes the idea of the correlation id is to make it easier for the client to
match a particular response to the request it answers. Kafka’s protocol
allows sending multiple requests without waiting for the response. In
theory you can just rely on ordering, but that can be a bit fragile if the
client has any kind of bug. So this id is an explicit check—a response with
id 42 is the answer to the request you sent with id 42. Hope that helps!

-Jay

On Fri, Sep 29, 2017 at 4:52 PM Ted Yu <yu...@gmail.com> wrote:

> Which release / version are you looking at ?
> In trunk branch, I only see one toSend():
>
>     protected Send toSend(String destination, ResponseHeader header, short
> apiVersion) {
>
>         return new NetworkSend(destination, serialize(apiVersion, header));
>
> On Fri, Sep 29, 2017 at 4:49 PM, Javed, Haseeb <
> javed.19@buckeyemail.osu.edu
> > wrote:
>
> > The Kafka protocol guide mentions that each request and response contains
> > a correlationId which is a user-supplied integer to match requests and
> > corresponding responses. However, when I look at the code in the class
> > AbstractResponse, we have a method defined as following:
> >
> >
> > public Send toSend(String destination, RequestHeader requestHeader) {
> >     return toSend(destination, requestHeader.apiVersion(), requestHeader.
> > toResponseHeader());
> > }
> >
> > So basically we are just using the requestHeader to generate the
> > responseHeader so doesn't this pretty much guarantees that the
> > correlationId for the Request and the Response would always be the same,
> or
> > am I missing something?
> >
> >
> >
>

Re: How is CorrelationId used for matching request and response

Posted by Jay Kreps <ja...@confluent.io>.
Yes the idea of the correlation id is to make it easier for the client to
match a particular response to the request it answers. Kafka’s protocol
allows sending multiple requests without waiting for the response. In
theory you can just rely on ordering, but that can be a bit fragile if the
client has any kind of bug. So this id is an explicit check—a response with
id 42 is the answer to the request you sent with id 42. Hope that helps!

-Jay

On Fri, Sep 29, 2017 at 4:52 PM Ted Yu <yu...@gmail.com> wrote:

> Which release / version are you looking at ?
> In trunk branch, I only see one toSend():
>
>     protected Send toSend(String destination, ResponseHeader header, short
> apiVersion) {
>
>         return new NetworkSend(destination, serialize(apiVersion, header));
>
> On Fri, Sep 29, 2017 at 4:49 PM, Javed, Haseeb <
> javed.19@buckeyemail.osu.edu
> > wrote:
>
> > The Kafka protocol guide mentions that each request and response contains
> > a correlationId which is a user-supplied integer to match requests and
> > corresponding responses. However, when I look at the code in the class
> > AbstractResponse, we have a method defined as following:
> >
> >
> > public Send toSend(String destination, RequestHeader requestHeader) {
> >     return toSend(destination, requestHeader.apiVersion(), requestHeader.
> > toResponseHeader());
> > }
> >
> > So basically we are just using the requestHeader to generate the
> > responseHeader so doesn't this pretty much guarantees that the
> > correlationId for the Request and the Response would always be the same,
> or
> > am I missing something?
> >
> >
> >
>

Re: How is CorrelationId used for matching request and response

Posted by Ted Yu <yu...@gmail.com>.
Which release / version are you looking at ?
In trunk branch, I only see one toSend():

    protected Send toSend(String destination, ResponseHeader header, short
apiVersion) {

        return new NetworkSend(destination, serialize(apiVersion, header));

On Fri, Sep 29, 2017 at 4:49 PM, Javed, Haseeb <javed.19@buckeyemail.osu.edu
> wrote:

> The Kafka protocol guide mentions that each request and response contains
> a correlationId which is a user-supplied integer to match requests and
> corresponding responses. However, when I look at the code in the class
> AbstractResponse, we have a method defined as following:
>
>
> public Send toSend(String destination, RequestHeader requestHeader) {
>     return toSend(destination, requestHeader.apiVersion(), requestHeader.
> toResponseHeader());
> }
>
> So basically we are just using the requestHeader to generate the
> responseHeader so doesn't this pretty much guarantees that the
> correlationId for the Request and the Response would always be the same, or
> am I missing something?
>
>
>