You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@mesos.apache.org by "Marco Massenzio (JIRA)" <ji...@apache.org> on 2015/11/09 22:25:11 UTC
[jira] [Updated] (MESOS-3705) HTTP Pipelining doesn't keep order of
requests
[ https://issues.apache.org/jira/browse/MESOS-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Marco Massenzio updated MESOS-3705:
-----------------------------------
Sprint: Mesosphere Sprint 21, Mesosphere Sprint 22 (was: Mesosphere Sprint 21)
> HTTP Pipelining doesn't keep order of requests
> ----------------------------------------------
>
> Key: MESOS-3705
> URL: https://issues.apache.org/jira/browse/MESOS-3705
> Project: Mesos
> Issue Type: Bug
> Components: libprocess
> Affects Versions: 0.24.0
> Reporter: Alexander Rojas
> Assignee: Alexander Rojas
> Labels: http, libprocess, mesosphere
>
> [HTTP 1.1 Pipelining|https://en.wikipedia.org/wiki/HTTP_pipelining] describes a mechanism by which multiple HTTP request can be performed over a single socket. The requirement here is that responses should be send in the same order as requests are being made.
> Libprocess has some mechanisms built in to deal with pipelining when multiple HTTP requests are made, it is still, however, possible to create a situation in which responses are scrambled respected to the requests arrival.
> Consider the situation in which there are two libprocess processes, {{processA}} and {{processB}}, each running in a different thread, {{thread2}} and {{thread3}} respectively. The [{{ProcessManager}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L374] runs in {{thread1}}.
> {{processA}} is of type {{ProcessA}} which looks roughly as follows:
> {code}
> class ProcessA : public ProcessBase<ProcessA>
> {
> public:
> ProcessA() {}
> Future<http::Response> foo(const http::Request&) {
> // … Do something …
> return http::Ok();
> }
> protected:
> virtual void initialize() {
> route("/foo", None(), &ProcessA::foo);
> }
> }
> {code}
> {{processB}} is from type {{ProcessB}} which is just like {{ProcessA}} but routes {{"bar"}} instead of {{"foo"}}.
> The situation in which the bug arises is the following:
> # Two requests, one for {{"http://server_uri/(1)/foo"}} and one for {{"http://server_uri/(2)//bar"}} are made over the same socket.
> # The first request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. This one creates an {{HttpEvent}} and delivers to the handler, in this case {{processA}}.
> # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processA}} queue. This happens in {{thread1}}.
> # The second request arrives to [{{ProcessManager::handle}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2202] which is still running in {{thread1}}. Another {{HttpEvent}} is created and delivered to the handler, in this case {{processB}}.
> # [{{ProcessManager::deliver}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L2361] enqueues the HTTP event in to the {{processB}} queue. This happens in {{thread1}}.
> # {{Thread2}} is blocked, so {{processA}} cannot handle the first request, it is stuck in the queue.
> # {{Thread3}} is idle, so it picks up the request to {{processB}} immediately.
> # [{{ProcessBase::visit(HttpEvent)}}|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3073] is called in {{thread3}}, this one in turn [dispatches|https://github.com/apache/mesos/blob/1d68eed9089659b06a1e710f707818dbcafeec52/3rdparty/libprocess/src/process.cpp#L3106] the response's future to the {{HttpProxy}} associated with the socket where the request came.
> At the last point, the bug is evident, the request to {{processB}} will be send before the request to {{processA}} even if the handler takes a long time and the {{processA::bar()}} actually finishes before. The responses are not send in the order the requests are done.
> h1. Reproducer
> The following is a test which successfully reproduces the issue:
> {code}
> class PipelineScramblerProcess : public Process<PipelineScramblerProcess>
> {
> public:
> PipelineScramblerProcess()
> : ProcessBase(ID::generate("PipelineScramblerProcess")) {}
> void block(const Future<Nothing>& trigger)
> {
> trigger.await();
> }
> Future<http::Response> get(const http::Request& request)
> {
> if (promise_) {
> promise_->set(Nothing());
> }
> return http::OK(self().id);
> }
> void setPromise(std::unique_ptr<Promise<Nothing>>& promise)
> {
> promise_ = std::move(promise);
> }
> protected:
> virtual void initialize()
> {
> route("/get", None(), &PipelineScramblerProcess::get);
> }
> private:
> std::unique_ptr<Promise<Nothing>> promise_;
> };
> TEST(HTTPConnectionTest, ComplexPipelining)
> {
> PipelineScramblerProcess blocked;
> spawn(blocked);
> PipelineScramblerProcess unblocked;
> spawn(unblocked);
> ASSERT_EQ(blocked.self().address.ip, unblocked.self().address.ip);
> ASSERT_EQ(blocked.self().address.port, unblocked.self().address.port);
> std::unique_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
> // Block the first process so it cannot process the first request until
> // the second request is finished.
> dispatch(blocked, &PipelineScramblerProcess::block, promise->future());
> // Promise will be set once 'fast' serves the second request.
> unblocked.setPromise(promise);
> // Get connection for HTTP pipelining.
> Future<http::Connection> connect =
> http::connect(http::URL(
> "http",
> blocked.self().address.ip,
> blocked.self().address.port));
> AWAIT_READY(connect);
> http::Connection connection = connect.get();
> http::Request blockedRequest;
> blockedRequest.method = "GET";
> blockedRequest.url = http::URL(
> "http",
> blocked.self().address.ip,
> blocked.self().address.port,
> blocked.self().id + "/get");
> blockedRequest.keepAlive = true;
> Future<http::Response> blockedResponse = connection.send(blockedRequest);
> http::Request unblockedRequest;
> unblockedRequest.method = "GET";
> unblockedRequest.url = http::URL(
> "http",
> unblocked.self().address.ip,
> unblocked.self().address.port,
> unblocked.self().id + "/get");
> unblockedRequest.keepAlive = true;
> Future<http::Response> unblockedResponse = connection.send(unblockedRequest);
> AWAIT_READY(blockedResponse);
> AWAIT_READY(unblockedResponse);
> EXPECT_EQ(blocked.self().id, blockedResponse->body);
> EXPECT_EQ(unblocked.self().id, unblockedResponse->body);
> AWAIT_READY(connection.disconnect());
> AWAIT_READY(connection.disconnected());
> terminate(blocked);
> wait(blocked);
> terminate(unblocked);
> wait(unblocked);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)