You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/10/06 02:11:25 UTC
[3/4] mesos git commit: Added an http::Connection for connection
re-use and pipelining.
Added an http::Connection for connection re-use and pipelining.
Review: https://reviews.apache.org/r/38608
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00645436
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00645436
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00645436
Branch: refs/heads/master
Commit: 00645436616ccdc96be9810904fa6b4476a53925
Parents: 5fc2025
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Sep 21 18:27:46 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Oct 5 16:41:15 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 51 ++++
3rdparty/libprocess/src/http.cpp | 322 +++++++++++++++++++++
3rdparty/libprocess/src/tests/http_tests.cpp | 331 ++++++++++++++++++++++
3 files changed, 704 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index ba3f0bc..dfcc188 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -50,6 +50,10 @@ namespace process {
template <typename T>
class Future;
+namespace network {
+class Socket;
+} // namespace network {
+
namespace http {
// Status code reason strings, from the HTTP1.1 RFC:
@@ -715,6 +719,53 @@ std::string encode(const hashmap<std::string, std::string>& query);
} // namespace query {
+/**
+ * Represents a connection to an HTTP server. Pipelining will be
+ * used when there are multiple requests in-flight.
+ *
+ * TODO(bmahler): This does not prevent pipelining with HTTP/1.0.
+ */
+class Connection
+{
+public:
+ Connection() = delete;
+
+ /**
+ * Sends a request to the server. If there are additional requests
+ * in flight, pipelining will occur. If 'streamedResponse' is set,
+ * the response body will be of type 'PIPE'. Note that if the
+ * request or response has a 'Connection: close' header, the
+ * connection will close after the response completes.
+ */
+ Future<Response> send(const Request& request, bool streamedResponse = false);
+
+ /**
+ * Disconnects from the server.
+ */
+ Future<Nothing> disconnect();
+
+ /**
+ * Returns a future that is satisfied when a disconnection occurs.
+ */
+ Future<Nothing> disconnected();
+
+ bool operator==(const Connection& c) const { return data == c.data; }
+ bool operator!=(const Connection& c) const { return !(*this == c); }
+
+private:
+ Connection(const network::Socket& s);
+ friend Future<Connection> connect(const URL&);
+
+ // Forward declaration.
+ struct Data;
+
+ std::shared_ptr<Data> data;
+};
+
+
+Future<Connection> connect(const URL& url);
+
+
// TODO(bmahler): Consolidate these functions into a single
// http::request function that takes a 'Request' object.
http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 3dd7898..d1ff13e 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -28,11 +28,17 @@
#include <queue>
#include <string>
#include <sstream>
+#include <tuple>
#include <vector>
+#include <process/async.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
+#include <process/id.hpp>
#include <process/owned.hpp>
+#include <process/process.hpp>
#include <process/socket.hpp>
#include <stout/foreach.hpp>
@@ -55,6 +61,7 @@ using std::ostream;
using std::ostringstream;
using std::queue;
using std::string;
+using std::tuple;
using std::vector;
using process::http::Request;
@@ -748,6 +755,321 @@ Response __convert(const Response& pipeResponse, const string& body)
}
+class ConnectionProcess : public Process<ConnectionProcess>
+{
+public:
+ ConnectionProcess(const Socket& _socket)
+ : ProcessBase(ID::generate("__http_connection__")),
+ socket(_socket),
+ sendChain(Nothing()),
+ close(false) {}
+
+ Future<Response> send(const Request& request, bool streamedResponse)
+ {
+ if (!disconnection.future().isPending()) {
+ return Failure("Disconnected");
+ }
+
+ if (close) {
+ return Failure("Cannot pipeline after 'Connection: close'");
+ }
+
+ if (!request.keepAlive) {
+ close = true;
+ }
+
+ // We must chain the calls to Socket::send as it
+ // otherwise interleaves data across calls.
+ Socket socket_ = socket;
+
+ sendChain = sendChain
+ .then([socket_, request]() mutable {
+ return socket_.send(encode(request));
+ });
+
+ // If we can't write to the socket, disconnect.
+ sendChain
+ .onFailed(defer(self(), [this](const string& failure) {
+ disconnect(failure);
+ }));
+
+ Promise<Response> promise;
+ Future<Response> response = promise.future();
+
+ pipeline.push(std::make_tuple(streamedResponse, std::move(promise)));
+
+ return response;
+ }
+
+ Future<Nothing> disconnect(const Option<std::string>& message = None())
+ {
+ Try<Nothing> shutdown = socket.shutdown();
+
+ disconnection.set(Nothing());
+
+ // If a response is still streaming, we send EOF to
+ // the decoder in order to fail the pipe reader.
+ if (decoder.writingBody()) {
+ decoder.decode("", 0);
+ }
+
+ // Fail any remaining pipelined responses.
+ while (!pipeline.empty()) {
+ std::get<1>(pipeline.front()).fail(
+ message.isSome() ? message.get() : "Disconnected");
+ pipeline.pop();
+ }
+
+ return shutdown;
+ }
+
+ Future<Nothing> disconnected()
+ {
+ return disconnection.future();
+ }
+
+protected:
+ virtual void initialize()
+ {
+ // Start the read loop on the socket. We read independently
+ // of the requests being sent in order to detect socket
+ // closure at any time.
+ read();
+ }
+
+ virtual void finalize()
+ {
+ disconnect("Connection object was destructed");
+ }
+
+private:
+ void read()
+ {
+ socket.recv()
+ .onAny(defer(self(), &Self::_read, lambda::_1));
+ }
+
+ void _read(const Future<string>& data)
+ {
+ deque<Response*> responses;
+
+ if (!data.isReady() || data->empty()) {
+ // Process EOF. Also send EOF to the decoder if a failure
+ // or discard is encountered.
+ responses = decoder.decode("", 0);
+ } else {
+ // We should only receive data if we're expecting a response
+ // in the pipeline, or if a response body is still streaming.
+ if (pipeline.empty() && !decoder.writingBody()) {
+ disconnect("Received data when none is expected");
+ return;
+ }
+
+ responses = decoder.decode(data->data(), data->length());
+ }
+
+ // Process any decoded responses.
+ while (!responses.empty()) {
+ // We do not expect any responses when the pipeline is empty.
+ // Note that this may occur when a 'Connection: close' header
+ // prematurely terminates the pipeline.
+ if (pipeline.empty()) {
+ while (!responses.empty()) {
+ delete responses.front();
+ responses.pop_front();
+ }
+
+ disconnect("Received response without a request");
+ return;
+ }
+
+ Response* response = responses.front();
+ responses.pop_front();
+
+ tuple<bool, Promise<Response>> t = std::move(pipeline.front());
+ pipeline.pop();
+
+ bool streamedResponse = std::get<0>(t);
+ Promise<Response> promise = std::move(std::get<1>(t));
+
+ if (streamedResponse) {
+ promise.set(*response);
+ } else {
+ // If the response should not be streamed, we convert
+ // the PIPE response into a BODY response.
+ promise.associate(convert(*response));
+ }
+
+ if (response->headers.contains("Connection") &&
+ response->headers.at("Connection") == "close") {
+ // This is the last response the server will send!
+ close = true;
+
+ // Fail the remainder of the pipeline.
+ while (!pipeline.empty()) {
+ std::get<1>(pipeline.front()).fail(
+ "Received 'Connection: close' from the server");
+ pipeline.pop();
+ }
+ }
+
+ delete response;
+ }
+
+ // We keep reading and feeding data to the decoder until
+ // EOF or a failure is encountered.
+ if (!data.isReady()) {
+ disconnect(data.isFailed() ? data.failure() : "discarded");
+ return;
+ } else if (data->empty()) {
+ disconnect(); // EOF.
+ return;
+ } else if (decoder.failed()) {
+ disconnect("Failed to decode response");
+ return;
+ }
+
+ // Close the connection if a 'Connection: close' header
+ // was found and we're done reading the last response.
+ if (close && pipeline.empty() && !decoder.writingBody()) {
+ disconnect();
+ return;
+ }
+
+ read();
+ }
+
+ Socket socket;
+ StreamingResponseDecoder decoder;
+ Future<Nothing> sendChain;
+ Promise<Nothing> disconnection;
+
+ // For each response in the pipeline, we store a bool for
+ // whether the caller wants the response to be streamed.
+ queue<tuple<bool, Promise<Response>>> pipeline;
+
+ // Whether the connection should be closed upon the
+ // completion of the last outstanding response.
+ bool close;
+};
+
+} // namespace internal {
+
+
+struct Connection::Data
+{
+ Data(const Socket& s)
+ : process(new internal::ConnectionProcess(s))
+ {
+ spawn(process.get());
+ }
+
+ ~Data()
+ {
+ // Note that we pass 'false' here to avoid injecting the
+ // termination event at the front of the queue. This is
+ // to ensure we don't drop any queued request dispatches
+ // which would leave the caller with a future stuck in
+ // a pending state.
+ terminate(process.get(), false);
+ wait(process.get());
+ }
+
+ Owned<internal::ConnectionProcess> process;
+};
+
+
+Connection::Connection(const Socket& s)
+ : data(std::make_shared<Connection::Data>(s)) {}
+
+
+Future<Response> Connection::send(
+ const http::Request& request,
+ bool streamedResponse)
+{
+ return dispatch(
+ data->process.get(),
+ &internal::ConnectionProcess::send,
+ request,
+ streamedResponse);
+}
+
+
+Future<Nothing> Connection::disconnect()
+{
+ return dispatch(
+ data->process.get(),
+ &internal::ConnectionProcess::disconnect,
+ None());
+}
+
+
+Future<Nothing> Connection::disconnected()
+{
+ return dispatch(
+ data->process.get(),
+ &internal::ConnectionProcess::disconnected);
+}
+
+
+Future<Connection> connect(const URL& url)
+{
+ // TODO(bmahler): Move address resolution into the URL class?
+ Address address;
+
+ if (url.ip.isNone() && url.domain.isNone()) {
+ return Failure("Expected URL.ip or URL.domain to be set");
+ }
+
+ if (url.ip.isSome()) {
+ address.ip = url.ip.get();
+ } else {
+ Try<net::IP> ip = net::getIP(url.domain.get(), AF_INET);
+
+ if (ip.isError()) {
+ return Failure("Failed to determine IP of domain '" +
+ url.domain.get() + "': " + ip.error());
+ }
+
+ address.ip = ip.get();
+ }
+
+ if (url.port.isNone()) {
+ return Failure("Expecting url.port to be set");
+ }
+
+ address.port = url.port.get();
+
+ Try<Socket> socket = [&url]() -> Try<Socket> {
+ // Default to 'http' if no scheme was specified.
+ if (url.scheme.isNone() || url.scheme == string("http")) {
+ return Socket::create(Socket::POLL);
+ }
+
+ if (url.scheme == string("https")) {
+#ifdef USE_SSL_SOCKET
+ return Socket::create(Socket::SSL);
+#else
+ return Error("'https' scheme requires SSL enabled");
+#endif
+ }
+
+ return Error("Unsupported URL scheme");
+ }();
+
+ if (socket.isError()) {
+ return Failure("Failed to create socket: " + socket.error());
+ }
+
+ return socket->connect(address)
+ .then([socket]() {
+ return Connection(socket.get());
+ });
+}
+
+
+namespace internal {
+
// Forward declaration.
void _decode(
Socket socket,
http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index c380f35..38f3ad7 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -28,6 +28,7 @@
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/io.hpp>
+#include <process/owned.hpp>
#include <process/socket.hpp>
#include <stout/base64.hpp>
@@ -41,6 +42,8 @@
using namespace process;
+using process::Owned;
+
using process::http::URL;
using process::network::Socket;
@@ -646,6 +649,334 @@ TEST(HTTPTest, Delete)
}
+TEST(HTTPConnectionTest, Serial)
+{
+ Http http;
+
+ http::URL url = http::URL(
+ "http",
+ http.process->self().address.ip,
+ http.process->self().address.port,
+ http.process->self().id + "/get");
+
+ Future<http::Connection> connect = http::connect(url);
+ AWAIT_READY(connect);
+
+ http::Connection connection = connect.get();
+
+ // First test a regular (non-streaming) request.
+ Promise<http::Response> promise1;
+ Future<http::Request> get1;
+
+ EXPECT_CALL(*http.process, get(_))
+ .WillOnce(DoAll(FutureArg<0>(&get1), Return(promise1.future())));
+
+ http::Request request1;
+ request1.method = "GET";
+ request1.url = url;
+ request1.body = "1";
+ request1.keepAlive = true;
+
+ Future<http::Response> response1 = connection.send(request1);
+
+ AWAIT_READY(get1);
+ EXPECT_EQ("1", get1->body);
+
+ promise1.set(http::OK("1"));
+
+ AWAIT_EXPECT_RESPONSE_BODY_EQ("1", response1);
+
+ // Now test a streaming response.
+ Promise<http::Response> promise2;
+ Future<http::Request> get2;
+
+ EXPECT_CALL(*http.process, get(_))
+ .WillOnce(DoAll(FutureArg<0>(&get2), Return(promise2.future())));
+
+ http::Request request2 = request1;
+ request2.body = "2";
+
+ Future<http::Response> response2 = connection.send(request2, true);
+
+ AWAIT_READY(get2);
+ EXPECT_EQ("2", get2->body);
+
+ promise2.set(http::OK("2"));
+
+ AWAIT_READY(response2);
+ ASSERT_SOME(response2->reader);
+
+ http::Pipe::Reader reader = response2->reader.get();
+ AWAIT_EQ("2", reader.read());
+ AWAIT_EQ("", reader.read());
+
+ // Disconnect.
+ AWAIT_READY(connection.disconnect());
+ AWAIT_READY(connection.disconnected());
+
+ // After disconnection, sends should fail.
+ AWAIT_FAILED(connection.send(request1));
+}
+
+
+TEST(HTTPConnectionTest, Pipeline)
+{
+ Http http;
+
+ http::URL url = http::URL(
+ "http",
+ http.process->self().address.ip,
+ http.process->self().address.port,
+ http.process->self().id + "/get");
+
+ Future<http::Connection> connect = http::connect(url);
+ AWAIT_READY(connect);
+
+ http::Connection connection = connect.get();
+
+ // Send three pipelined requests.
+ Promise<http::Response> promise1, promise2, promise3;
+ Future<http::Request> get1, get2, get3;
+
+ EXPECT_CALL(*http.process, get(_))
+ .WillOnce(DoAll(FutureArg<0>(&get1),
+ Return(promise1.future())))
+ .WillOnce(DoAll(FutureArg<0>(&get2),
+ Return(promise2.future())))
+ .WillOnce(DoAll(FutureArg<0>(&get3),
+ Return(promise3.future())));
+
+ http::Request request1, request2, request3;
+
+ request1.method = "GET";
+ request2.method = "GET";
+ request3.method = "GET";
+
+ request1.url = url;
+ request2.url = url;
+ request3.url = url;
+
+ request1.body = "1";
+ request2.body = "2";
+ request3.body = "3";
+
+ request1.keepAlive = true;
+ request2.keepAlive = true;
+ request3.keepAlive = true;
+
+ Future<http::Response> response1 = connection.send(request1);
+ Future<http::Response> response2 = connection.send(request2, true);
+ Future<http::Response> response3 = connection.send(request3);
+
+ // Ensure the requests are all received before any
+ // responses have been sent.
+ AWAIT_READY(get1);
+ AWAIT_READY(get2);
+ AWAIT_READY(get3);
+
+ EXPECT_EQ("1", get1->body);
+ EXPECT_EQ("2", get2->body);
+ EXPECT_EQ("3", get3->body);
+
+ // Complete the responses in the opposite order, and ensure
+ // that the pipelining in libprocess sends the responses in
+ // the same order as the requests were received.
+ promise3.set(http::OK("3"));
+ promise2.set(http::OK("2"));
+
+ EXPECT_TRUE(response1.isPending());
+ EXPECT_TRUE(response2.isPending());
+ EXPECT_TRUE(response3.isPending());
+
+ promise1.set(http::OK("1"));
+
+ AWAIT_READY(response1);
+ AWAIT_READY(response2);
+ AWAIT_READY(response3);
+
+ EXPECT_EQ("1", response1->body);
+
+ ASSERT_SOME(response2->reader);
+
+ http::Pipe::Reader reader = response2->reader.get();
+ AWAIT_EQ("2", reader.read());
+ AWAIT_EQ("", reader.read());
+
+ EXPECT_EQ("3", response3->body);
+
+ // Disconnect.
+ AWAIT_READY(connection.disconnect());
+ AWAIT_READY(connection.disconnected());
+
+ // After disconnection, sends should fail.
+ AWAIT_FAILED(connection.send(request1));
+}
+
+
+TEST(HTTPConnectionTest, ClosingRequest)
+{
+ Http http;
+
+ http::URL url = http::URL(
+ "http",
+ http.process->self().address.ip,
+ http.process->self().address.port,
+ http.process->self().id + "/get");
+
+ Future<http::Connection> connect = http::connect(url);
+ AWAIT_READY(connect);
+
+ http::Connection connection = connect.get();
+
+ // Issue two pipelined requests, the second will not have
+ // 'keepAlive' set. This prevents further requests and leads
+ // to a disconnection upon receiving the second response.
+ Promise<http::Response> promise1, promise2;
+ Future<http::Request> get1, get2;
+
+ EXPECT_CALL(*http.process, get(_))
+ .WillOnce(DoAll(FutureArg<0>(&get1),
+ Return(promise1.future())))
+ .WillOnce(DoAll(FutureArg<0>(&get2),
+ Return(promise2.future())));
+
+ http::Request request1, request2;
+
+ request1.method = "GET";
+ request2.method = "GET";
+
+ request1.url = url;
+ request2.url = url;
+
+ request1.keepAlive = true;
+ request2.keepAlive = false;
+
+ Future<http::Response> response1 = connection.send(request1);
+ Future<http::Response> response2 = connection.send(request2);
+
+ // After a closing request, sends should fail.
+ AWAIT_FAILED(connection.send(request1));
+
+ // Complete the responses.
+ promise1.set(http::OK("body"));
+ promise2.set(http::OK("body"));
+
+ AWAIT_READY(response1);
+ AWAIT_READY(response2);
+
+ AWAIT_READY(connection.disconnected());
+}
+
+
+TEST(HTTPConnectionTest, ClosingResponse)
+{
+ Http http;
+
+ http::URL url = http::URL(
+ "http",
+ http.process->self().address.ip,
+ http.process->self().address.port,
+ http.process->self().id + "/get");
+
+ Future<http::Connection> connect = http::connect(url);
+ AWAIT_READY(connect);
+
+ http::Connection connection = connect.get();
+
+ // Issue two pipelined requests, the server will respond
+ // with a 'Connection: close' for the first response, which
+ // will lead to a disconnection.
+ Promise<http::Response> promise1, promise2;
+ Future<http::Request> get1, get2;
+
+ EXPECT_CALL(*http.process, get(_))
+ .WillOnce(DoAll(FutureArg<0>(&get1), Return(promise1.future())))
+ .WillOnce(DoAll(FutureArg<0>(&get2), Return(promise2.future())));
+
+ http::Request request1, request2;
+
+ request1.method = "GET";
+ request2.method = "GET";
+
+ request1.url = url;
+ request2.url = url;
+
+ request1.keepAlive = true;
+ request2.keepAlive = true;
+
+ Future<http::Response> response1 = connection.send(request1);
+ Future<http::Response> response2 = connection.send(request2);
+
+ // The first response will close the connection.
+ http::Response ok = http::OK("body");
+ ok.headers["Connection"] = "close";
+
+ promise1.set(ok);
+
+ AWAIT_READY(response1);
+ AWAIT_FAILED(response2);
+
+ AWAIT_READY(connection.disconnected());
+}
+
+
+TEST(HTTPConnectionTest, ReferenceCounting)
+{
+ Http http;
+
+ http::URL url = http::URL(
+ "http",
+ http.process->self().address.ip,
+ http.process->self().address.port,
+ http.process->self().id + "/get");
+
+ // Capture the connection as a Owned in order to test that
+ // when the last copy of the Connection is destructed, a
+ // disconnection occurs.
+ auto connect = Owned<Future<http::Connection>>(
+ new Future<http::Connection>(http::connect(url)));
+
+ AWAIT_READY(*connect);
+
+ auto connection = Owned<http::Connection>(
+ new http::Connection(connect->get()));
+
+ connect.reset();
+
+ Future<Nothing> disconnected = connection->disconnected();
+
+ // This should be the last remaining copy of the connection.
+ connection.reset();
+
+ AWAIT_READY(disconnected);
+}
+
+
+TEST(HTTPConnectionTest, Equality)
+{
+ Http http;
+
+ http::URL url = http::URL(
+ "http",
+ http.process->self().address.ip,
+ http.process->self().address.port,
+ http.process->self().id + "/get");
+
+ Future<http::Connection> connect = http::connect(url);
+ AWAIT_READY(connect);
+
+ http::Connection connection1 = connect.get();
+
+ connect = http::connect(url);
+ AWAIT_READY(connect);
+
+ http::Connection connection2 = connect.get();
+
+ EXPECT_NE(connection1, connection2);
+ EXPECT_EQ(connection2, connection2);
+}
+
+
TEST(HTTPTest, QueryEncodeDecode)
{
// If we use Type<a, b> directly inside a macro without surrounding