You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@mesos.apache.org by "Alexander Rojas (JIRA)" <ji...@apache.org> on 2015/10/13 14:34:05 UTC

[jira] [Updated] (MESOS-3705) HTTP Pipelining doesn't keep order of requests

     [ https://issues.apache.org/jira/browse/MESOS-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexander Rojas updated MESOS-3705:
-----------------------------------
    Description: 
[HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made.

Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival.

Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}.

{{processA}} is of type {{ProcessA}} which looks roughly as follows:

{code}
class ProcessA : public ProcessBase<ProcessA>
{
public:
  ProcessA() {}

  Future<http::Response> foo(const http::Request&) {
    // … Do something …
   return http::Ok();
  }

protected:
  virtual void initialize() {
    route("/foo", None(), &ProcessA::foo);
  }
}
{code}

{{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}.

The situation in which the bug arises is the following:

# Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket.
# The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}.
# [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}.
# The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}.
# [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}.
# {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue.
# {{Thread3}} is idle, so it picks up the request to {{processB}} immediately.
# [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came.

At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done.

h1. Reproducer

The following is a test which successfully reproduces the issue:

{code}
class PipelineScramblerProcess : public Process<PipelineScramblerProcess>
{
public:
  PipelineScramblerProcess()
    : ProcessBase(ID::generate("PipelineScramblerProcess")) {}

  void block(const Future<Nothing>& trigger)
  {
    trigger.await();
  }

  Future<http::Response> get(const http::Request& request)
  {
    if (promise_) {
      promise_->set(Nothing());
    }

    return http::OK(self().id);
  }

  void setPromise(std::unique_ptr<Promise<Nothing>>& promise)
  {
    promise_ = std::move(promise);
  }

protected:
  virtual void initialize()
  {
    route("/get", None(), &PipelineScramblerProcess::get);
  }

private:
  std::unique_ptr<Promise<Nothing>> promise_;
};

TEST(HTTPConnectionTest, ComplexPipelining)
{
  PipelineScramblerProcess blocked;
  spawn(blocked);
  PipelineScramblerProcess unblocked;
  spawn(unblocked);

  ASSERT_EQ(blocked.self().address.ip, unblocked.self().address.ip);
  ASSERT_EQ(blocked.self().address.port, unblocked.self().address.port);

  std::unique_ptr<Promise<Nothing>> promise(new Promise<Nothing>());

  // Block the first process so it cannot process the first request until
  // the second request is finished.
  dispatch(blocked, &PipelineScramblerProcess::block, promise->future());

  // Promise will be set once 'fast' serves the second request.
  unblocked.setPromise(promise);

  // Get connection for HTTP pipelining.
  Future<http::Connection> connect =
    http::connect(http::URL(
        "http",
        blocked.self().address.ip,
        blocked.self().address.port));

  AWAIT_READY(connect);

  http::Connection connection =  connect.get();

  http::Request blockedRequest;
  blockedRequest.method = "GET";
  blockedRequest.url = http::URL(
      "http",
      blocked.self().address.ip,
      blocked.self().address.port,
      blocked.self().id + "/get");
  blockedRequest.keepAlive = true;
  Future<http::Response> blockedResponse = connection.send(blockedRequest);

  http::Request unblockedRequest;
  unblockedRequest.method = "GET";
  unblockedRequest.url = http::URL(
      "http",
      unblocked.self().address.ip,
      unblocked.self().address.port,
      unblocked.self().id + "/get");
  unblockedRequest.keepAlive = true;
  Future<http::Response> unblockedResponse = connection.send(unblockedRequest);

  AWAIT_READY(blockedResponse);
  AWAIT_READY(unblockedResponse);

  EXPECT_EQ(blocked.self().id, blockedResponse->body);
  EXPECT_EQ(unblocked.self().id, unblockedResponse->body);

  AWAIT_READY(connection.disconnect());
  AWAIT_READY(connection.disconnected());

  terminate(blocked);
  wait(blocked);
  terminate(unblocked);
  wait(unblocked);
}
{code}

  was:
[HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made.

Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival.

Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}.

{{processA}} is of type {{ProcessA}} which looks roughly as follows:

{code}
class ProcessA : public ProcessBase<ProcessA>
{
public:
  ProcessA() {}

  Future<http::Response> foo(const http::Request&) {
    // … Do something …
   return http::Ok();
  }

protected:
  virtual void initialize() {
    route("/foo", None(), &ProcessA::foo);
  }
}
{code}

{{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}.

The situation in which the bug arises is the following:

# Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket.
# The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}.
# [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}.
# The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}.
# [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}.
# {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue.
# {{Thread3}} is idle, so it picks up the request to {{processB}} immediately.
# [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came.

At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done.


> HTTP Pipelining doesn't keep order of requests
> ----------------------------------------------
>
>                 Key: MESOS-3705
>                 URL: https://issues.apache.org/jira/browse/MESOS-3705
>             Project: Mesos
>          Issue Type: Bug
>          Components: libprocess
>    Affects Versions: 0.24.0
>            Reporter: Alexander Rojas
>            Assignee: Alexander Rojas
>              Labels: http, libprocess, mesosphere
>
> [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made.
> Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival.
> Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}.
> {{processA}} is of type {{ProcessA}} which looks roughly as follows:
> {code}
> class ProcessA : public ProcessBase<ProcessA>
> {
> public:
>   ProcessA() {}
>   Future<http::Response> foo(const http::Request&) {
>     // … Do something …
>    return http::Ok();
>   }
> protected:
>   virtual void initialize() {
>     route("/foo", None(), &ProcessA::foo);
>   }
> }
> {code}
> {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}.
> The situation in which the bug arises is the following:
> # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket.
> # The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}.
> # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}.
> # The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}.
> # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}.
> # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue.
> # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately.
> # [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came.
> At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done.
> h1. Reproducer
> The following is a test which successfully reproduces the issue:
> {code}
> class PipelineScramblerProcess : public Process<PipelineScramblerProcess>
> {
> public:
>   PipelineScramblerProcess()
>     : ProcessBase(ID::generate("PipelineScramblerProcess")) {}
>   void block(const Future<Nothing>& trigger)
>   {
>     trigger.await();
>   }
>   Future<http::Response> get(const http::Request& request)
>   {
>     if (promise_) {
>       promise_->set(Nothing());
>     }
>     return http::OK(self().id);
>   }
>   void setPromise(std::unique_ptr<Promise<Nothing>>& promise)
>   {
>     promise_ = std::move(promise);
>   }
> protected:
>   virtual void initialize()
>   {
>     route("/get", None(), &PipelineScramblerProcess::get);
>   }
> private:
>   std::unique_ptr<Promise<Nothing>> promise_;
> };
> TEST(HTTPConnectionTest, ComplexPipelining)
> {
>   PipelineScramblerProcess blocked;
>   spawn(blocked);
>   PipelineScramblerProcess unblocked;
>   spawn(unblocked);
>   ASSERT_EQ(blocked.self().address.ip, unblocked.self().address.ip);
>   ASSERT_EQ(blocked.self().address.port, unblocked.self().address.port);
>   std::unique_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
>   // Block the first process so it cannot process the first request until
>   // the second request is finished.
>   dispatch(blocked, &PipelineScramblerProcess::block, promise->future());
>   // Promise will be set once 'fast' serves the second request.
>   unblocked.setPromise(promise);
>   // Get connection for HTTP pipelining.
>   Future<http::Connection> connect =
>     http::connect(http::URL(
>         "http",
>         blocked.self().address.ip,
>         blocked.self().address.port));
>   AWAIT_READY(connect);
>   http::Connection connection =  connect.get();
>   http::Request blockedRequest;
>   blockedRequest.method = "GET";
>   blockedRequest.url = http::URL(
>       "http",
>       blocked.self().address.ip,
>       blocked.self().address.port,
>       blocked.self().id + "/get");
>   blockedRequest.keepAlive = true;
>   Future<http::Response> blockedResponse = connection.send(blockedRequest);
>   http::Request unblockedRequest;
>   unblockedRequest.method = "GET";
>   unblockedRequest.url = http::URL(
>       "http",
>       unblocked.self().address.ip,
>       unblocked.self().address.port,
>       unblocked.self().id + "/get");
>   unblockedRequest.keepAlive = true;
>   Future<http::Response> unblockedResponse = connection.send(unblockedRequest);
>   AWAIT_READY(blockedResponse);
>   AWAIT_READY(unblockedResponse);
>   EXPECT_EQ(blocked.self().id, blockedResponse->body);
>   EXPECT_EQ(unblocked.self().id, unblockedResponse->body);
>   AWAIT_READY(connection.disconnect());
>   AWAIT_READY(connection.disconnected());
>   terminate(blocked);
>   wait(blocked);
>   terminate(unblocked);
>   wait(unblocked);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)