You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/10/06 02:11:25 UTC

[3/4] mesos git commit: Added an http::Connection for connection re-use and pipelining.

Added an http::Connection for connection re-use and pipelining.

Review: https://reviews.apache.org/r/38608


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00645436
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00645436
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00645436

Branch: refs/heads/master
Commit: 00645436616ccdc96be9810904fa6b4476a53925
Parents: 5fc2025
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Sep 21 18:27:46 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Oct 5 16:41:15 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp |  51 ++++
 3rdparty/libprocess/src/http.cpp             | 322 +++++++++++++++++++++
 3rdparty/libprocess/src/tests/http_tests.cpp | 331 ++++++++++++++++++++++
 3 files changed, 704 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index ba3f0bc..dfcc188 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -50,6 +50,10 @@ namespace process {
 template <typename T>
 class Future;
 
+namespace network {
+class Socket;
+} // namespace network {
+
 namespace http {
 
 // Status code reason strings, from the HTTP1.1 RFC:
@@ -715,6 +719,53 @@ std::string encode(const hashmap<std::string, std::string>& query);
 } // namespace query {
 
 
+/**
+ * Represents a connection to an HTTP server. Pipelining will be
+ * used when there are multiple requests in-flight.
+ *
+ * TODO(bmahler): This does not prevent pipelining with HTTP/1.0.
+ */
+class Connection
+{
+public:
+  Connection() = delete;
+
+  /**
+   * Sends a request to the server. If there are additional requests
+   * in flight, pipelining will occur. If 'streamedResponse' is set,
+   * the response body will be of type 'PIPE'. Note that if the
+   * request or response has a 'Connection: close' header, the
+   * connection will close after the response completes.
+   */
+  Future<Response> send(const Request& request, bool streamedResponse = false);
+
+  /**
+   * Disconnects from the server.
+   */
+  Future<Nothing> disconnect();
+
+  /**
+   * Returns a future that is satisfied when a disconnection occurs.
+   */
+  Future<Nothing> disconnected();
+
+  bool operator==(const Connection& c) const { return data == c.data; }
+  bool operator!=(const Connection& c) const { return !(*this == c); }
+
+private:
+  Connection(const network::Socket& s);
+  friend Future<Connection> connect(const URL&);
+
+  // Forward declaration.
+  struct Data;
+
+  std::shared_ptr<Data> data;
+};
+
+
+Future<Connection> connect(const URL& url);
+
+
 // TODO(bmahler): Consolidate these functions into a single
 // http::request function that takes a 'Request' object.
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 3dd7898..d1ff13e 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -28,11 +28,17 @@
 #include <queue>
 #include <string>
 #include <sstream>
+#include <tuple>
 #include <vector>
 
+#include <process/async.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
+#include <process/id.hpp>
 #include <process/owned.hpp>
+#include <process/process.hpp>
 #include <process/socket.hpp>
 
 #include <stout/foreach.hpp>
@@ -55,6 +61,7 @@ using std::ostream;
 using std::ostringstream;
 using std::queue;
 using std::string;
+using std::tuple;
 using std::vector;
 
 using process::http::Request;
@@ -748,6 +755,321 @@ Response __convert(const Response& pipeResponse, const string& body)
 }
 
 
+class ConnectionProcess : public Process<ConnectionProcess>
+{
+public:
+  ConnectionProcess(const Socket& _socket)
+    : ProcessBase(ID::generate("__http_connection__")),
+      socket(_socket),
+      sendChain(Nothing()),
+      close(false) {}
+
+  Future<Response> send(const Request& request, bool streamedResponse)
+  {
+    if (!disconnection.future().isPending()) {
+      return Failure("Disconnected");
+    }
+
+    if (close) {
+      return Failure("Cannot pipeline after 'Connection: close'");
+    }
+
+    if (!request.keepAlive) {
+      close = true;
+    }
+
+    // We must chain the calls to Socket::send as it
+    // otherwise interleaves data across calls.
+    Socket socket_ = socket;
+
+    sendChain = sendChain
+      .then([socket_, request]() mutable {
+        return socket_.send(encode(request));
+      });
+
+    // If we can't write to the socket, disconnect.
+    sendChain
+      .onFailed(defer(self(), [this](const string& failure) {
+        disconnect(failure);
+      }));
+
+    Promise<Response> promise;
+    Future<Response> response = promise.future();
+
+    pipeline.push(std::make_tuple(streamedResponse, std::move(promise)));
+
+    return response;
+  }
+
+  Future<Nothing> disconnect(const Option<std::string>& message = None())
+  {
+    Try<Nothing> shutdown = socket.shutdown();
+
+    disconnection.set(Nothing());
+
+    // If a response is still streaming, we send EOF to
+    // the decoder in order to fail the pipe reader.
+    if (decoder.writingBody()) {
+      decoder.decode("", 0);
+    }
+
+    // Fail any remaining pipelined responses.
+    while (!pipeline.empty()) {
+      std::get<1>(pipeline.front()).fail(
+          message.isSome() ? message.get() : "Disconnected");
+      pipeline.pop();
+    }
+
+    return shutdown;
+  }
+
+  Future<Nothing> disconnected()
+  {
+    return disconnection.future();
+  }
+
+protected:
+  virtual void initialize()
+  {
+    // Start the read loop on the socket. We read independently
+    // of the requests being sent in order to detect socket
+    // closure at any time.
+    read();
+  }
+
+  virtual void finalize()
+  {
+    disconnect("Connection object was destructed");
+  }
+
+private:
+  void read()
+  {
+    socket.recv()
+      .onAny(defer(self(), &Self::_read, lambda::_1));
+  }
+
+  void _read(const Future<string>& data)
+  {
+    deque<Response*> responses;
+
+    if (!data.isReady() || data->empty()) {
+      // Process EOF. Also send EOF to the decoder if a failure
+      // or discard is encountered.
+      responses = decoder.decode("", 0);
+    } else {
+      // We should only receive data if we're expecting a response
+      // in the pipeline, or if a response body is still streaming.
+      if (pipeline.empty() && !decoder.writingBody()) {
+        disconnect("Received data when none is expected");
+        return;
+      }
+
+      responses = decoder.decode(data->data(), data->length());
+    }
+
+    // Process any decoded responses.
+    while (!responses.empty()) {
+      // We do not expect any responses when the pipeline is empty.
+      // Note that this may occur when a 'Connection: close' header
+      // prematurely terminates the pipeline.
+      if (pipeline.empty()) {
+        while (!responses.empty()) {
+          delete responses.front();
+          responses.pop_front();
+        }
+
+        disconnect("Received response without a request");
+        return;
+      }
+
+      Response* response = responses.front();
+      responses.pop_front();
+
+      tuple<bool, Promise<Response>> t = std::move(pipeline.front());
+      pipeline.pop();
+
+      bool streamedResponse = std::get<0>(t);
+      Promise<Response> promise = std::move(std::get<1>(t));
+
+      if (streamedResponse) {
+        promise.set(*response);
+      } else {
+        // If the response should not be streamed, we convert
+        // the PIPE response into a BODY response.
+        promise.associate(convert(*response));
+      }
+
+      if (response->headers.contains("Connection") &&
+          response->headers.at("Connection") == "close") {
+        // This is the last response the server will send!
+        close = true;
+
+        // Fail the remainder of the pipeline.
+        while (!pipeline.empty()) {
+          std::get<1>(pipeline.front()).fail(
+              "Received 'Connection: close' from the server");
+          pipeline.pop();
+        }
+      }
+
+      delete response;
+    }
+
+    // We keep reading and feeding data to the decoder until
+    // EOF or a failure is encountered.
+    if (!data.isReady()) {
+      disconnect(data.isFailed() ? data.failure() : "discarded");
+      return;
+    } else if (data->empty()) {
+      disconnect(); // EOF.
+      return;
+    } else if (decoder.failed()) {
+      disconnect("Failed to decode response");
+      return;
+    }
+
+    // Close the connection if a 'Connection: close' header
+    // was found and we're done reading the last response.
+    if (close && pipeline.empty() && !decoder.writingBody()) {
+      disconnect();
+      return;
+    }
+
+    read();
+  }
+
+  Socket socket;
+  StreamingResponseDecoder decoder;
+  Future<Nothing> sendChain;
+  Promise<Nothing> disconnection;
+
+  // For each response in the pipeline, we store a bool for
+  // whether the caller wants the response to be streamed.
+  queue<tuple<bool, Promise<Response>>> pipeline;
+
+  // Whether the connection should be closed upon the
+  // completion of the last outstanding response.
+  bool close;
+};
+
+} // namespace internal {
+
+
+struct Connection::Data
+{
+  Data(const Socket& s)
+    : process(new internal::ConnectionProcess(s))
+  {
+    spawn(process.get());
+  }
+
+  ~Data()
+  {
+    // Note that we pass 'false' here to avoid injecting the
+    // termination event at the front of the queue. This is
+    // to ensure we don't drop any queued request dispatches
+    // which would leave the caller with a future stuck in
+    // a pending state.
+    terminate(process.get(), false);
+    wait(process.get());
+  }
+
+  Owned<internal::ConnectionProcess> process;
+};
+
+
+Connection::Connection(const Socket& s)
+  : data(std::make_shared<Connection::Data>(s)) {}
+
+
+Future<Response> Connection::send(
+    const http::Request& request,
+    bool streamedResponse)
+{
+  return dispatch(
+      data->process.get(),
+      &internal::ConnectionProcess::send,
+      request,
+      streamedResponse);
+}
+
+
+Future<Nothing> Connection::disconnect()
+{
+  return dispatch(
+      data->process.get(),
+      &internal::ConnectionProcess::disconnect,
+      None());
+}
+
+
+Future<Nothing> Connection::disconnected()
+{
+  return dispatch(
+      data->process.get(),
+      &internal::ConnectionProcess::disconnected);
+}
+
+
+Future<Connection> connect(const URL& url)
+{
+  // TODO(bmahler): Move address resolution into the URL class?
+  Address address;
+
+  if (url.ip.isNone() && url.domain.isNone()) {
+    return Failure("Expected URL.ip or URL.domain to be set");
+  }
+
+  if (url.ip.isSome()) {
+    address.ip = url.ip.get();
+  } else {
+    Try<net::IP> ip = net::getIP(url.domain.get(), AF_INET);
+
+    if (ip.isError()) {
+      return Failure("Failed to determine IP of domain '" +
+                     url.domain.get() + "': " + ip.error());
+    }
+
+    address.ip = ip.get();
+  }
+
+  if (url.port.isNone()) {
+    return Failure("Expecting url.port to be set");
+  }
+
+  address.port = url.port.get();
+
+  Try<Socket> socket = [&url]() -> Try<Socket> {
+    // Default to 'http' if no scheme was specified.
+    if (url.scheme.isNone() || url.scheme == string("http")) {
+      return Socket::create(Socket::POLL);
+    }
+
+    if (url.scheme == string("https")) {
+#ifdef USE_SSL_SOCKET
+      return Socket::create(Socket::SSL);
+#else
+      return Error("'https' scheme requires SSL enabled");
+#endif
+    }
+
+    return Error("Unsupported URL scheme");
+  }();
+
+  if (socket.isError()) {
+    return Failure("Failed to create socket: " + socket.error());
+  }
+
+  return socket->connect(address)
+    .then([socket]() {
+      return Connection(socket.get());
+    });
+}
+
+
+namespace internal {
+
 // Forward declaration.
 void _decode(
     Socket socket,

http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index c380f35..38f3ad7 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -28,6 +28,7 @@
 #include <process/gtest.hpp>
 #include <process/http.hpp>
 #include <process/io.hpp>
+#include <process/owned.hpp>
 #include <process/socket.hpp>
 
 #include <stout/base64.hpp>
@@ -41,6 +42,8 @@
 
 using namespace process;
 
+using process::Owned;
+
 using process::http::URL;
 
 using process::network::Socket;
@@ -646,6 +649,334 @@ TEST(HTTPTest, Delete)
 }
 
 
+TEST(HTTPConnectionTest, Serial)
+{
+  Http http;
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/get");
+
+  Future<http::Connection> connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection = connect.get();
+
+  // First test a regular (non-streaming) request.
+  Promise<http::Response> promise1;
+  Future<http::Request> get1;
+
+  EXPECT_CALL(*http.process, get(_))
+    .WillOnce(DoAll(FutureArg<0>(&get1), Return(promise1.future())));
+
+  http::Request request1;
+  request1.method = "GET";
+  request1.url = url;
+  request1.body = "1";
+  request1.keepAlive = true;
+
+  Future<http::Response> response1 = connection.send(request1);
+
+  AWAIT_READY(get1);
+  EXPECT_EQ("1", get1->body);
+
+  promise1.set(http::OK("1"));
+
+  AWAIT_EXPECT_RESPONSE_BODY_EQ("1", response1);
+
+  // Now test a streaming response.
+  Promise<http::Response> promise2;
+  Future<http::Request> get2;
+
+  EXPECT_CALL(*http.process, get(_))
+    .WillOnce(DoAll(FutureArg<0>(&get2), Return(promise2.future())));
+
+  http::Request request2 = request1;
+  request2.body = "2";
+
+  Future<http::Response> response2 = connection.send(request2, true);
+
+  AWAIT_READY(get2);
+  EXPECT_EQ("2", get2->body);
+
+  promise2.set(http::OK("2"));
+
+  AWAIT_READY(response2);
+  ASSERT_SOME(response2->reader);
+
+  http::Pipe::Reader reader = response2->reader.get();
+  AWAIT_EQ("2", reader.read());
+  AWAIT_EQ("", reader.read());
+
+  // Disconnect.
+  AWAIT_READY(connection.disconnect());
+  AWAIT_READY(connection.disconnected());
+
+  // After disconnection, sends should fail.
+  AWAIT_FAILED(connection.send(request1));
+}
+
+
+TEST(HTTPConnectionTest, Pipeline)
+{
+  Http http;
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/get");
+
+  Future<http::Connection> connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection = connect.get();
+
+  // Send three pipelined requests.
+  Promise<http::Response> promise1, promise2, promise3;
+  Future<http::Request> get1, get2, get3;
+
+  EXPECT_CALL(*http.process, get(_))
+    .WillOnce(DoAll(FutureArg<0>(&get1),
+                    Return(promise1.future())))
+    .WillOnce(DoAll(FutureArg<0>(&get2),
+                    Return(promise2.future())))
+    .WillOnce(DoAll(FutureArg<0>(&get3),
+                    Return(promise3.future())));
+
+  http::Request request1, request2, request3;
+
+  request1.method = "GET";
+  request2.method = "GET";
+  request3.method = "GET";
+
+  request1.url = url;
+  request2.url = url;
+  request3.url = url;
+
+  request1.body = "1";
+  request2.body = "2";
+  request3.body = "3";
+
+  request1.keepAlive = true;
+  request2.keepAlive = true;
+  request3.keepAlive = true;
+
+  Future<http::Response> response1 = connection.send(request1);
+  Future<http::Response> response2 = connection.send(request2, true);
+  Future<http::Response> response3 = connection.send(request3);
+
+  // Ensure the requests are all received before any
+  // responses have been sent.
+  AWAIT_READY(get1);
+  AWAIT_READY(get2);
+  AWAIT_READY(get3);
+
+  EXPECT_EQ("1", get1->body);
+  EXPECT_EQ("2", get2->body);
+  EXPECT_EQ("3", get3->body);
+
+  // Complete the responses in the opposite order, and ensure
+  // that the pipelining in libprocess sends the responses in
+  // the same order as the requests were received.
+  promise3.set(http::OK("3"));
+  promise2.set(http::OK("2"));
+
+  EXPECT_TRUE(response1.isPending());
+  EXPECT_TRUE(response2.isPending());
+  EXPECT_TRUE(response3.isPending());
+
+  promise1.set(http::OK("1"));
+
+  AWAIT_READY(response1);
+  AWAIT_READY(response2);
+  AWAIT_READY(response3);
+
+  EXPECT_EQ("1", response1->body);
+
+  ASSERT_SOME(response2->reader);
+
+  http::Pipe::Reader reader = response2->reader.get();
+  AWAIT_EQ("2", reader.read());
+  AWAIT_EQ("", reader.read());
+
+  EXPECT_EQ("3", response3->body);
+
+  // Disconnect.
+  AWAIT_READY(connection.disconnect());
+  AWAIT_READY(connection.disconnected());
+
+  // After disconnection, sends should fail.
+  AWAIT_FAILED(connection.send(request1));
+}
+
+
+TEST(HTTPConnectionTest, ClosingRequest)
+{
+  Http http;
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/get");
+
+  Future<http::Connection> connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection = connect.get();
+
+  // Issue two pipelined requests, the second will not have
+  // 'keepAlive' set. This prevents further requests and leads
+  // to a disconnection upon receiving the second response.
+  Promise<http::Response> promise1, promise2;
+  Future<http::Request> get1, get2;
+
+  EXPECT_CALL(*http.process, get(_))
+    .WillOnce(DoAll(FutureArg<0>(&get1),
+                    Return(promise1.future())))
+    .WillOnce(DoAll(FutureArg<0>(&get2),
+                    Return(promise2.future())));
+
+  http::Request request1, request2;
+
+  request1.method = "GET";
+  request2.method = "GET";
+
+  request1.url = url;
+  request2.url = url;
+
+  request1.keepAlive = true;
+  request2.keepAlive = false;
+
+  Future<http::Response> response1 = connection.send(request1);
+  Future<http::Response> response2 = connection.send(request2);
+
+  // After a closing request, sends should fail.
+  AWAIT_FAILED(connection.send(request1));
+
+  // Complete the responses.
+  promise1.set(http::OK("body"));
+  promise2.set(http::OK("body"));
+
+  AWAIT_READY(response1);
+  AWAIT_READY(response2);
+
+  AWAIT_READY(connection.disconnected());
+}
+
+
+TEST(HTTPConnectionTest, ClosingResponse)
+{
+  Http http;
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/get");
+
+  Future<http::Connection> connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection = connect.get();
+
+  // Issue two pipelined requests, the server will respond
+  // with a 'Connection: close' for the first response, which
+  // will lead to a disconnection.
+  Promise<http::Response> promise1, promise2;
+  Future<http::Request> get1, get2;
+
+  EXPECT_CALL(*http.process, get(_))
+    .WillOnce(DoAll(FutureArg<0>(&get1), Return(promise1.future())))
+    .WillOnce(DoAll(FutureArg<0>(&get2), Return(promise2.future())));
+
+  http::Request request1, request2;
+
+  request1.method = "GET";
+  request2.method = "GET";
+
+  request1.url = url;
+  request2.url = url;
+
+  request1.keepAlive = true;
+  request2.keepAlive = true;
+
+  Future<http::Response> response1 = connection.send(request1);
+  Future<http::Response> response2 = connection.send(request2);
+
+  // The first response will close the connection.
+  http::Response ok = http::OK("body");
+  ok.headers["Connection"] = "close";
+
+  promise1.set(ok);
+
+  AWAIT_READY(response1);
+  AWAIT_FAILED(response2);
+
+  AWAIT_READY(connection.disconnected());
+}
+
+
+TEST(HTTPConnectionTest, ReferenceCounting)
+{
+  Http http;
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/get");
+
+  // Capture the connection as a Owned in order to test that
+  // when the last copy of the Connection is destructed, a
+  // disconnection occurs.
+  auto connect = Owned<Future<http::Connection>>(
+      new Future<http::Connection>(http::connect(url)));
+
+  AWAIT_READY(*connect);
+
+  auto connection = Owned<http::Connection>(
+      new http::Connection(connect->get()));
+
+  connect.reset();
+
+  Future<Nothing> disconnected = connection->disconnected();
+
+  // This should be the last remaining copy of the connection.
+  connection.reset();
+
+  AWAIT_READY(disconnected);
+}
+
+
+TEST(HTTPConnectionTest, Equality)
+{
+  Http http;
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/get");
+
+  Future<http::Connection> connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection1 = connect.get();
+
+  connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection2 = connect.get();
+
+  EXPECT_NE(connection1, connection2);
+  EXPECT_EQ(connection2, connection2);
+}
+
+
 TEST(HTTPTest, QueryEncodeDecode)
 {
   // If we use Type<a, b> directly inside a macro without surrounding