You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jason Gustafson <ja...@confluent.io> on 2016/09/26 18:11:07 UTC

Re: it this a bug? - message disorder in async send mode -- 0.9.0 java client sdk InFlightRequests

Hi there,

The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.

-Jason

On Sat, Sep 24, 2016 at 1:19 AM, 一生有你 <tr...@foxmail.com> wrote:

> We know that in the async send mode, kafka do not guarantee the message
> order even for  the same partition.
>
>
> That is, if we send 3 request  ( the same topic, the same partition)  to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3),  while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1,  and
> return to the client by the order 2, 3, 1。
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true,  below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient,  there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
>     private final int maxInFlightRequestsPerConnection;
>     private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>();    ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request,  and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
>     request.setSendTimeMs(now);
>     this.inFlightRequests.add(request);
>     selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
>     for (NetworkReceive receive : this.selector.completedReceives()) {
>         String source = receive.source();
>         ClientRequest req = inFlightRequests.completeNext(source);
>         ResponseHeader header = ResponseHeader.parse(receive.payload());
>         // Always expect the response version id to be the same as the
> request version id
>         short apiKey = req.request().header().apiKey();
>         short apiVer = req.request().header().apiVersion();
>         Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
>         correlate(req.request().header(), header);
>         if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
>             responses.add(new ClientResponse(req, now, false, body));
>     }
> }
> but if the request order and the response order does not match,  is it the
> Deque suitable?  or it should be use a Map to maintain the request?
> By the way, in the above,  there is a function correlate(xxx) to check the
> match, if not match,  it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
>     if (requestHeader.correlationId() != responseHeader.correlationId())
>         throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
>                 + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode,  as mentioned above,  the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?

回复: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests

Posted by 一生有你 <tr...@foxmail.com>.
hi, Json
I know the server logic:  the server use  selector mute/unmute ,  whenever the socket receive a request,  it will
mute until the response return, it become unmute.






------------------ 原始邮件 ------------------
发件人: "一生有你";<tr...@foxmail.com>;
发送时间: 2016年9月27日(星期二) 中午11:57
收件人: "dev"<de...@kafka.apache.org>; 

主题: 回复: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests



hi, Jason
can you explain the "head of line request blocking" in more detail?   I am very curious, thanks! 


below is the code:


class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
the requestQueue is consumed by multiple threads,  so how it can guarantee the response order the same as the request order? 


------------------ 原始邮件 ------------------
发件人: "Jason Gustafson";<ja...@confluent.io>;
发送时间: 2016年9月27日(星期二) 凌晨2:11
收件人: "dev"<de...@kafka.apache.org>; 

主题: Re: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests



Hi there,

The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.

-Jason

On Sat, Sep 24, 2016 at 1:19 AM, 一生有你 <tr...@foxmail.com> wrote:

> We know that in the async send mode, kafka do not guarantee the message
> order even for  the same partition.
>
>
> That is, if we send 3 request  ( the same topic, the same partition)  to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3),  while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1,  and
> return to the client by the order 2, 3, 1。
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true,  below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient,  there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
>     private final int maxInFlightRequestsPerConnection;
>     private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>();    ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request,  and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
>     request.setSendTimeMs(now);
>     this.inFlightRequests.add(request);
>     selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
>     for (NetworkReceive receive : this.selector.completedReceives()) {
>         String source = receive.source();
>         ClientRequest req = inFlightRequests.completeNext(source);
>         ResponseHeader header = ResponseHeader.parse(receive.payload());
>         // Always expect the response version id to be the same as the
> request version id
>         short apiKey = req.request().header().apiKey();
>         short apiVer = req.request().header().apiVersion();
>         Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
>         correlate(req.request().header(), header);
>         if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
>             responses.add(new ClientResponse(req, now, false, body));
>     }
> }
> but if the request order and the response order does not match,  is it the
> Deque suitable?  or it should be use a Map to maintain the request?
> By the way, in the above,  there is a function correlate(xxx) to check the
> match, if not match,  it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
>     if (requestHeader.correlationId() != responseHeader.correlationId())
>         throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
>                 + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode,  as mentioned above,  the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?

回复: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests

Posted by 一生有你 <tr...@foxmail.com>.
hi, Jason
can you explain the "head of line request blocking" in more detail?   I am very curious, thanks! 


below is the code:


class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
the requestQueue is consumed by multiple threads,  so how it can guarantee the response order the same as the request order? 


------------------ 原始邮件 ------------------
发件人: "Jason Gustafson";<ja...@confluent.io>;
发送时间: 2016年9月27日(星期二) 凌晨2:11
收件人: "dev"<de...@kafka.apache.org>; 

主题: Re: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests



Hi there,

The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.

-Jason

On Sat, Sep 24, 2016 at 1:19 AM, 一生有你 <tr...@foxmail.com> wrote:

> We know that in the async send mode, kafka do not guarantee the message
> order even for  the same partition.
>
>
> That is, if we send 3 request  ( the same topic, the same partition)  to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3),  while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1,  and
> return to the client by the order 2, 3, 1。
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true,  below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient,  there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
>     private final int maxInFlightRequestsPerConnection;
>     private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>();    ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request,  and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
>     request.setSendTimeMs(now);
>     this.inFlightRequests.add(request);
>     selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
>     for (NetworkReceive receive : this.selector.completedReceives()) {
>         String source = receive.source();
>         ClientRequest req = inFlightRequests.completeNext(source);
>         ResponseHeader header = ResponseHeader.parse(receive.payload());
>         // Always expect the response version id to be the same as the
> request version id
>         short apiKey = req.request().header().apiKey();
>         short apiVer = req.request().header().apiVersion();
>         Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
>         correlate(req.request().header(), header);
>         if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
>             responses.add(new ClientResponse(req, now, false, body));
>     }
> }
> but if the request order and the response order does not match,  is it the
> Deque suitable?  or it should be use a Map to maintain the request?
> By the way, in the above,  there is a function correlate(xxx) to check the
> match, if not match,  it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
>     if (requestHeader.correlationId() != responseHeader.correlationId())
>         throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
>                 + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode,  as mentioned above,  the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?