You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@mesos.apache.org by "Marco Massenzio (JIRA)" <ji...@apache.org> on 2015/11/09 22:25:11 UTC

[jira] [Updated] (MESOS-3705) HTTP Pipelining doesn't keep order of requests

     [ https://issues.apache.org/jira/browse/MESOS-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marco Massenzio updated MESOS-3705:
-----------------------------------
    Sprint: Mesosphere Sprint 21, Mesosphere Sprint 22  (was: Mesosphere Sprint 21)

> HTTP Pipelining doesn't keep order of requests
> ----------------------------------------------
>
>                 Key: MESOS-3705
>                 URL: https://issues.apache.org/jira/browse/MESOS-3705
>             Project: Mesos
>          Issue Type: Bug
>          Components: libprocess
>    Affects Versions: 0.24.0
>            Reporter: Alexander Rojas
>            Assignee: Alexander Rojas
>              Labels: http, libprocess, mesosphere
>
> [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made.
> Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival.
> Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}.
> {{processA}} is of type {{ProcessA}} which looks roughly as follows:
> {code}
> class ProcessA : public ProcessBase<ProcessA>
> {
> public:
>   ProcessA() {}
>   Future<http::Response> foo(const http::Request&) {
>     // … Do something …
>    return http::Ok();
>   }
> protected:
>   virtual void initialize() {
>     route("/foo", None(), &ProcessA::foo);
>   }
> }
> {code}
> {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}.
> The situation in which the bug arises is the following:
> # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket.
> # The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}.
> # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}.
> # The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}.
> # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}.
> # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue.
> # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately.
> # [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came.
> At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done.
> h1. Reproducer
> The following is a test which successfully reproduces the issue:
> {code}
> class PipelineScramblerProcess : public Process<PipelineScramblerProcess>
> {
> public:
>   PipelineScramblerProcess()
>     : ProcessBase(ID::generate("PipelineScramblerProcess")) {}
>   void block(const Future<Nothing>& trigger)
>   {
>     trigger.await();
>   }
>   Future<http::Response> get(const http::Request& request)
>   {
>     if (promise_) {
>       promise_->set(Nothing());
>     }
>     return http::OK(self().id);
>   }
>   void setPromise(std::unique_ptr<Promise<Nothing>>& promise)
>   {
>     promise_ = std::move(promise);
>   }
> protected:
>   virtual void initialize()
>   {
>     route("/get", None(), &PipelineScramblerProcess::get);
>   }
> private:
>   std::unique_ptr<Promise<Nothing>> promise_;
> };
> TEST(HTTPConnectionTest, ComplexPipelining)
> {
>   PipelineScramblerProcess blocked;
>   spawn(blocked);
>   PipelineScramblerProcess unblocked;
>   spawn(unblocked);
>   ASSERT_EQ(blocked.self().address.ip, unblocked.self().address.ip);
>   ASSERT_EQ(blocked.self().address.port, unblocked.self().address.port);
>   std::unique_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
>   // Block the first process so it cannot process the first request until
>   // the second request is finished.
>   dispatch(blocked, &PipelineScramblerProcess::block, promise->future());
>   // Promise will be set once 'fast' serves the second request.
>   unblocked.setPromise(promise);
>   // Get connection for HTTP pipelining.
>   Future<http::Connection> connect =
>     http::connect(http::URL(
>         "http",
>         blocked.self().address.ip,
>         blocked.self().address.port));
>   AWAIT_READY(connect);
>   http::Connection connection =  connect.get();
>   http::Request blockedRequest;
>   blockedRequest.method = "GET";
>   blockedRequest.url = http::URL(
>       "http",
>       blocked.self().address.ip,
>       blocked.self().address.port,
>       blocked.self().id + "/get");
>   blockedRequest.keepAlive = true;
>   Future<http::Response> blockedResponse = connection.send(blockedRequest);
>   http::Request unblockedRequest;
>   unblockedRequest.method = "GET";
>   unblockedRequest.url = http::URL(
>       "http",
>       unblocked.self().address.ip,
>       unblocked.self().address.port,
>       unblocked.self().id + "/get");
>   unblockedRequest.keepAlive = true;
>   Future<http::Response> unblockedResponse = connection.send(unblockedRequest);
>   AWAIT_READY(blockedResponse);
>   AWAIT_READY(unblockedResponse);
>   EXPECT_EQ(blocked.self().id, blockedResponse->body);
>   EXPECT_EQ(unblocked.self().id, unblockedResponse->body);
>   AWAIT_READY(connection.disconnect());
>   AWAIT_READY(connection.disconnected());
>   terminate(blocked);
>   wait(blocked);
>   terminate(unblocked);
>   wait(unblocked);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)