You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jason Gustafson <ja...@confluent.io> on 2016/09/26 18:11:07 UTC
Re: it this a bug? - message disorder in async send mode -- 0.9.0
java client sdk InFlightRequests
Hi there,
The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.
-Jason
On Sat, Sep 24, 2016 at 1:19 AM, 一生有你 <tr...@foxmail.com> wrote:
> We know that in the async send mode, kafka do not guarantee the message
> order even for the same partition.
>
>
> That is, if we send 3 request ( the same topic, the same partition) to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3), while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1, and
> return to the client by the order 2, 3, 1。
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true, below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient, there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
> private final int maxInFlightRequestsPerConnection;
> private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>(); ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request, and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
> request.setSendTimeMs(now);
> this.inFlightRequests.add(request);
> selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
> for (NetworkReceive receive : this.selector.completedReceives()) {
> String source = receive.source();
> ClientRequest req = inFlightRequests.completeNext(source);
> ResponseHeader header = ResponseHeader.parse(receive.payload());
> // Always expect the response version id to be the same as the
> request version id
> short apiKey = req.request().header().apiKey();
> short apiVer = req.request().header().apiVersion();
> Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
> correlate(req.request().header(), header);
> if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
> responses.add(new ClientResponse(req, now, false, body));
> }
> }
> but if the request order and the response order does not match, is it the
> Deque suitable? or it should be use a Map to maintain the request?
> By the way, in the above, there is a function correlate(xxx) to check the
> match, if not match, it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
> if (requestHeader.correlationId() != responseHeader.correlationId())
> throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
> + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode, as mentioned above, the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?
回复: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests
Posted by 一生有你 <tr...@foxmail.com>.
hi, Json
I know the server logic: the server use selector mute/unmute , whenever the socket receive a request, it will
mute until the response return, it become unmute.
------------------ 原始邮件 ------------------
发件人: "一生有你";<tr...@foxmail.com>;
发送时间: 2016年9月27日(星期二) 中午11:57
收件人: "dev"<de...@kafka.apache.org>;
主题: 回复: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests
hi, Jason
can you explain the "head of line request blocking" in more detail? I am very curious, thanks!
below is the code:
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
the requestQueue is consumed by multiple threads, so how it can guarantee the response order the same as the request order?
------------------ 原始邮件 ------------------
发件人: "Jason Gustafson";<ja...@confluent.io>;
发送时间: 2016年9月27日(星期二) 凌晨2:11
收件人: "dev"<de...@kafka.apache.org>;
主题: Re: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests
Hi there,
The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.
-Jason
On Sat, Sep 24, 2016 at 1:19 AM, 一生有你 <tr...@foxmail.com> wrote:
> We know that in the async send mode, kafka do not guarantee the message
> order even for the same partition.
>
>
> That is, if we send 3 request ( the same topic, the same partition) to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3), while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1, and
> return to the client by the order 2, 3, 1。
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true, below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient, there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
> private final int maxInFlightRequestsPerConnection;
> private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>(); ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request, and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
> request.setSendTimeMs(now);
> this.inFlightRequests.add(request);
> selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
> for (NetworkReceive receive : this.selector.completedReceives()) {
> String source = receive.source();
> ClientRequest req = inFlightRequests.completeNext(source);
> ResponseHeader header = ResponseHeader.parse(receive.payload());
> // Always expect the response version id to be the same as the
> request version id
> short apiKey = req.request().header().apiKey();
> short apiVer = req.request().header().apiVersion();
> Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
> correlate(req.request().header(), header);
> if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
> responses.add(new ClientResponse(req, now, false, body));
> }
> }
> but if the request order and the response order does not match, is it the
> Deque suitable? or it should be use a Map to maintain the request?
> By the way, in the above, there is a function correlate(xxx) to check the
> match, if not match, it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
> if (requestHeader.correlationId() != responseHeader.correlationId())
> throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
> + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode, as mentioned above, the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?
回复: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests
Posted by 一生有你 <tr...@foxmail.com>.
hi, Jason
can you explain the "head of line request blocking" in more detail? I am very curious, thanks!
below is the code:
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
the requestQueue is consumed by multiple threads, so how it can guarantee the response order the same as the request order?
------------------ 原始邮件 ------------------
发件人: "Jason Gustafson";<ja...@confluent.io>;
发送时间: 2016年9月27日(星期二) 凌晨2:11
收件人: "dev"<de...@kafka.apache.org>;
主题: Re: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests
Hi there,
The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.
-Jason
On Sat, Sep 24, 2016 at 1:19 AM, 一生有你 <tr...@foxmail.com> wrote:
> We know that in the async send mode, kafka do not guarantee the message
> order even for the same partition.
>
>
> That is, if we send 3 request ( the same topic, the same partition) to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3), while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1, and
> return to the client by the order 2, 3, 1。
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true, below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient, there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
> private final int maxInFlightRequestsPerConnection;
> private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>(); ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request, and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
> request.setSendTimeMs(now);
> this.inFlightRequests.add(request);
> selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
> for (NetworkReceive receive : this.selector.completedReceives()) {
> String source = receive.source();
> ClientRequest req = inFlightRequests.completeNext(source);
> ResponseHeader header = ResponseHeader.parse(receive.payload());
> // Always expect the response version id to be the same as the
> request version id
> short apiKey = req.request().header().apiKey();
> short apiVer = req.request().header().apiVersion();
> Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
> correlate(req.request().header(), header);
> if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
> responses.add(new ClientResponse(req, now, false, body));
> }
> }
> but if the request order and the response order does not match, is it the
> Deque suitable? or it should be use a Map to maintain the request?
> By the way, in the above, there is a function correlate(xxx) to check the
> match, if not match, it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
> if (requestHeader.correlationId() != responseHeader.correlationId())
> throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
> + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode, as mentioned above, the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?