You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/12/18 22:34:37 UTC

[mesos] 08/11: SSL Socket: Implemented socket connection and handshake.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit df3d8056dcf83ebdd0e0b3e365ab7c0d976ecaca
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Tue Oct 15 10:47:39 2019 -0700

    SSL Socket: Implemented socket connection and handshake.
    
    This fills in some of the SSL socket implementation,
    in particular the constructor, destructor, connect(),
    and accept() methods.
    
    Much of the setup and verification is taken verbatim from the
    libevent socket implementation.
    
    A change to the poll socket was necessary to prevent the SSL
    socket from holding a self-reference indefinitely.
    
    Review: https://reviews.apache.org/r/71665
---
 3rdparty/libprocess/include/process/socket.hpp  |   6 +
 3rdparty/libprocess/src/posix/poll_socket.cpp   |  14 +-
 3rdparty/libprocess/src/ssl/openssl_socket.cpp  | 368 +++++++++++++++++++++++-
 3rdparty/libprocess/src/ssl/openssl_socket.hpp  |  34 +++
 3rdparty/libprocess/src/windows/poll_socket.cpp |  16 +-
 5 files changed, 420 insertions(+), 18 deletions(-)

diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 48860f8..cf594cf 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -212,6 +212,12 @@ public:
 protected:
   explicit SocketImpl(int_fd _s) : s(_s) { CHECK(s >= 0); }
 
+#if defined(USE_SSL_SOCKET) && !defined(USE_LIBEVENT)
+  // Allows this class access to `release()` other types of sockets,
+  // like the `PollSocketImpl`.
+  friend class OpenSSLSocketImpl;
+#endif // USE_SSL_SOCKET && !USE_LIBEVENT
+
   /**
    * Releases ownership of the file descriptor. Not exposed
    * via the `Socket` interface as this is only intended to
diff --git a/3rdparty/libprocess/src/posix/poll_socket.cpp b/3rdparty/libprocess/src/posix/poll_socket.cpp
index ecc2bd4..1641ed3 100644
--- a/3rdparty/libprocess/src/posix/poll_socket.cpp
+++ b/3rdparty/libprocess/src/posix/poll_socket.cpp
@@ -52,13 +52,17 @@ Try<Nothing> PollSocketImpl::listen(int backlog)
 
 Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
 {
-  // Need to hold a copy of `this` so that the underlying socket
-  // doesn't end up getting reused before we return from the call to
-  // `io::poll` and end up accepting a socket incorrectly.
-  auto self = shared(this);
+  // Need to hold a copy of `this` so that we can detect if the underlying
+  // socket has changed (i.e. closed) before we return from `io::poll`.
+  std::weak_ptr<SocketImpl> weak_self(shared(this));
 
   return io::poll(get(), io::READ)
-    .then([self]() -> Future<std::shared_ptr<SocketImpl>> {
+    .then([weak_self]() -> Future<std::shared_ptr<SocketImpl>> {
+      std::shared_ptr<SocketImpl> self(weak_self.lock());
+      if (self == nullptr) {
+        return Failure("Socket destroyed while accepting");
+      }
+
       Try<int_fd> accepted = network::accept(self->get());
       if (accepted.isError()) {
         return Failure(accepted.error());
diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
index ce1f806..e1d8ad3 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
@@ -30,13 +30,17 @@
 
 #include <process/ssl/flags.hpp>
 
+#include <stout/net.hpp>
 #include <stout/synchronized.hpp>
 #include <stout/unimplemented.hpp>
+#include <stout/unreachable.hpp>
 
 #include "openssl.hpp"
 
 #include "ssl/openssl_socket.hpp"
 
+using process::network::openssl::Mode;
+
 namespace process {
 namespace network {
 namespace internal {
@@ -281,19 +285,29 @@ static BIO* BIO_new_libprocess(int_fd socket)
 }
 
 
-Try<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::create(int_fd s)
+Try<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::create(int_fd socket)
 {
-  UNIMPLEMENTED;
+  openssl::initialize();
+
+  if (!openssl::flags().enabled) {
+    return Error("SSL is disabled");
+  }
+
+  return std::make_shared<OpenSSLSocketImpl>(socket);
 }
 
 
-OpenSSLSocketImpl::OpenSSLSocketImpl(int_fd _s)
-  : PollSocketImpl(_s) {}
+OpenSSLSocketImpl::OpenSSLSocketImpl(int_fd socket)
+  : PollSocketImpl(socket),
+    ssl(nullptr) {}
 
 
 OpenSSLSocketImpl::~OpenSSLSocketImpl()
 {
-  UNIMPLEMENTED;
+  if (ssl != nullptr) {
+    SSL_free(ssl);
+    ssl = nullptr;
+  }
 }
 
 
@@ -308,7 +322,91 @@ Future<Nothing> OpenSSLSocketImpl::connect(
     const Address& address,
     const openssl::TLSClientConfig& config)
 {
-  UNIMPLEMENTED;
+  if (client_config.isSome()) {
+    return Failure("Socket is already connecting or connected");
+  }
+
+  if (config.ctx == nullptr) {
+    return Failure("Invalid SSL context");
+  }
+
+  // NOTE: The OpenSSLSocketImpl destructor is responsible for calling
+  // `SSL_free` on this SSL object.
+  ssl = SSL_new(config.ctx);
+  if (ssl == nullptr) {
+    return Failure("Failed to connect: SSL_new");
+  }
+
+  client_config = config;
+
+  if (config.configure_socket) {
+    Try<Nothing> configured = config.configure_socket(
+        ssl, address, config.servername);
+
+    if (configured.isError()) {
+      return Failure("Failed to configure socket: " + configured.error());
+    }
+  }
+
+  // Set the SSL context in client mode.
+  SSL_set_connect_state(ssl);
+
+  if (address.family() == Address::Family::INET4 ||
+      address.family() == Address::Family::INET6) {
+    Try<inet::Address> inet_address = network::convert<inet::Address>(address);
+
+    if (inet_address.isError()) {
+      return Failure("Failed to convert address: " + inet_address.error());
+    }
+
+    // Determine the 'peer_ip' from the address we're connecting to in
+    // order to properly verify the certificate later.
+    peer_ip = inet_address->ip;
+  }
+
+  if (config.servername.isSome()) {
+    VLOG(2) << "Connecting to " << config.servername.get() << " at " << address;
+  } else {
+    VLOG(2) << "Connecting to " << address << " with no hostname specified";
+  }
+
+  // Hold a weak pointer since the connection (plus handshaking)
+  // might never complete.
+  std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+  // Connect like a normal socket, then setup the I/O abstraction with OpenSSL
+  // and perform the TLS handshake.
+  return PollSocketImpl::connect(address)
+    .then([weak_self]() -> Future<size_t> {
+      std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+      if (self == nullptr) {
+        return Failure("Socket destroyed while connecting");
+      }
+
+      return self->set_ssl_and_do_handshake(self->ssl);
+    })
+    .then([weak_self]() -> Future<Nothing> {
+      std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+      if (self == nullptr) {
+        return Failure("Socket destroyed while connecting");
+      }
+
+      // Time to do post-verification.
+      CHECK(self->client_config.isSome());
+
+      if (self->client_config->verify) {
+        Try<Nothing> verify = self->client_config->verify(
+            self->ssl, self->client_config->servername, self->peer_ip);
+
+        if (verify.isError()) {
+          VLOG(1) << "Failed connect, verification error: " << verify.error();
+
+          return Failure(verify.error());
+        }
+      }
+
+      return Nothing();
+    });
 }
 
 
@@ -333,7 +431,141 @@ Future<size_t> OpenSSLSocketImpl::sendfile(
 
 Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept()
 {
-  UNIMPLEMENTED;
+  if (!accept_loop_started.once()) {
+    // Hold a weak pointer since we do not want this accept loop to extend
+    // the lifetime of `this` unnecessarily.
+    std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+    // We start accepting incoming connections in a loop here because a socket
+    // must complete the SSL handshake (or be downgraded) before the socket is
+    // considered ready. In case the incoming socket never writes any data,
+    // we do not wait for the accept logic to complete before accepting a
+    // new socket.
+    process::loop(
+        None(),
+        [weak_self]() -> Future<std::shared_ptr<SocketImpl>> {
+          std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+
+          if (self != nullptr) {
+            return self->PollSocketImpl::accept();
+          }
+
+          return Failure("Socket destructed");
+        },
+        [weak_self](const std::shared_ptr<SocketImpl>& socket)
+            -> Future<ControlFlow<Nothing>> {
+          std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+
+          if (self == nullptr) {
+            return Break();
+          }
+
+          // Wrap this new socket up into our SSL wrapper class by releasing
+          // the FD and creating a new OpenSSLSocketImpl object with the FD.
+          const std::shared_ptr<OpenSSLSocketImpl> ssl_socket =
+            std::make_shared<OpenSSLSocketImpl>(socket->release());
+
+          // Set up SSL object.
+          SSL* accept_ssl = SSL_new(openssl::context());
+          if (accept_ssl == nullptr) {
+            self->accept_queue.put(Failure("Accept failed, SSL_new"));
+            return Continue();
+          }
+
+          Try<Address> peer_address = network::peer(ssl_socket->get());
+          if (!peer_address.isSome()) {
+            SSL_free(accept_ssl);
+            self->accept_queue.put(
+                Failure("Could not determine peer IP for connection"));
+            return Continue();
+          }
+
+          // NOTE: Right now, `openssl::configure_socket` does not do anything
+          // in server mode, but we still pass the correct peer address to
+          // enable modules to implement application-level logic in the future.
+          Try<Nothing> configured = openssl::configure_socket(
+              accept_ssl, Mode::SERVER, peer_address.get(), None());
+
+          if (configured.isError()) {
+            SSL_free(accept_ssl);
+            self->accept_queue.put(
+                Failure("Could not configure socket: " + configured.error()));
+            return Continue();
+          }
+
+          // Set the SSL context in server mode.
+          SSL_set_accept_state(accept_ssl);
+
+          // Pass ownership of `accept_ssl` to the newly accepted socket,
+          // and wtart the SSL handshake. When the SSL handshake completes,
+          // the listening socket will place the result (failure or success)
+          // onto the listening socket's `accept_queue`.
+          //
+          // TODO(josephw): Add a timeout to catch/close incoming sockets which
+          // never finish the SSL handshake.
+          ssl_socket->set_ssl_and_do_handshake(accept_ssl)
+            .onAny([weak_self, ssl_socket](Future<size_t> result) {
+              std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+
+              if (self == nullptr) {
+                return;
+              }
+
+              if (result.isFailed()) {
+                self->accept_queue.put(Failure(result.failure()));
+                return;
+              }
+
+              // For verification purposes, we need to grab the address (again).
+              Try<Address> address = network::address(ssl_socket->get());
+              if (address.isError()) {
+                self->accept_queue.put(
+                    Failure("Failed to get address: " + address.error()));
+                return;
+              }
+
+              Try<inet::Address> inet_address =
+                network::convert<inet::Address>(address.get());
+
+              Try<Nothing> verify = openssl::verify(
+                  ssl_socket->ssl,
+                  Mode::SERVER,
+                  None(),
+                  inet_address.isSome()
+                    ? Some(inet_address->ip)
+                    : Option<net::IP>::none());
+
+              if (verify.isError()) {
+                VLOG(1) << "Failed accept, verification error: "
+                        << verify.error();
+
+                self->accept_queue.put(Failure(verify.error()));
+                return;
+              }
+
+              self->accept_queue.put(ssl_socket);
+            });
+
+          return Continue();
+        });
+
+    accept_loop_started.done();
+  }
+
+  // NOTE: In order to not deadlock the libprocess socket manager, we must
+  // defer accepted sockets regardless of success or failure. This prevents
+  // the socket manager from recursively calling `on_accept` and deadlocking.
+  return accept_queue.get()
+    .repair(defer(
+        [](const Future<Future<std::shared_ptr<SocketImpl>>>& failure) {
+          return failure;
+        }))
+    .then(defer(
+        [](const Future<std::shared_ptr<SocketImpl>>& impl)
+            -> Future<std::shared_ptr<SocketImpl>> {
+          CHECK(!impl.isPending());
+          return impl;
+        }));
 }
 
 
@@ -342,6 +574,128 @@ Try<Nothing, SocketError> OpenSSLSocketImpl::shutdown(int how)
   UNIMPLEMENTED;
 }
 
+
+Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl)
+{
+  // Save a reference to the SSL object.
+  ssl = _ssl;
+
+  // Construct the BIO wrapper for the underlying socket.
+  //
+  // NOTE: This transfers ownership of the BIO to the SSL object.
+  // The BIO will be freed upon calling `SSL_free(ssl)`.
+  BIO* bio = BIO_new_libprocess(get());
+  SSL_set_bio(ssl, bio, bio);
+
+  // Hold a weak pointer since the handshake may potentially never complete.
+  std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+  return process::loop(
+      None(),
+      [weak_self]() -> Future<int> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while doing handshake");
+        }
+
+        ERR_clear_error();
+        return SSL_do_handshake(self->ssl);
+      },
+      [weak_self](int result) -> Future<ControlFlow<size_t>> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while doing handshake");
+        }
+
+        // Check if EOF has been reached.
+        BIO* bio = SSL_get_rbio(self->ssl);
+        if (BIO_eof(bio) == 1) {
+          return Failure("EOF while doing handshake");
+        }
+
+        return self->handle_ssl_return_result(result, false);
+      });
+}
+
+
+Future<ControlFlow<size_t>> OpenSSLSocketImpl::handle_ssl_return_result(
+    int result,
+    bool handle_as_read)
+{
+  if (result > 0) {
+    // Successful result, potentially meaning a connected/accepted socket
+    // or a completed send/recv request.
+    return Break(result);
+  }
+
+  // Not a success, so we'll need to have the BIO and associated data handy.
+  BIO* bio = SSL_get_rbio(ssl);
+  SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio));
+  CHECK_NOTNULL(data);
+
+  int error = SSL_get_error(ssl, result);
+  switch (error) {
+    case SSL_ERROR_WANT_READ: {
+      synchronized (data->lock) {
+        if (data->recv_request.get() != nullptr) {
+          return data->recv_request->future
+            .then([]() -> Future<ControlFlow<size_t>> {
+              return Continue();
+            });
+        }
+      }
+
+      return Continue();
+    }
+    case SSL_ERROR_WANT_WRITE: {
+      synchronized (data->lock) {
+        if (data->send_request.get() != nullptr) {
+          return data->send_request->future
+            .then([]() -> Future<ControlFlow<size_t>> {
+              return Continue();
+            });
+        }
+      }
+
+      return Continue();
+    }
+    case SSL_ERROR_WANT_CLIENT_HELLO_CB:
+    case SSL_ERROR_WANT_X509_LOOKUP:
+      return Failure("Not implemented");
+    case SSL_ERROR_ZERO_RETURN:
+      if (handle_as_read) {
+        return Break(0u);
+      } else {
+        return Failure("TLS connection has been closed");
+      }
+    case SSL_ERROR_WANT_ASYNC:
+    case SSL_ERROR_WANT_ASYNC_JOB:
+      // We do not use `SSL_MODE_ASYNC`.
+    case SSL_ERROR_WANT_CONNECT:
+    case SSL_ERROR_WANT_ACCEPT:
+      // We make sure the underlying socket is connected prior
+      // to any interaction with the OpenSSL library.
+      UNREACHABLE();
+    case SSL_ERROR_SSL:
+      return Failure("Protocol error");
+    case SSL_ERROR_SYSCALL:
+      // NOTE: If there is an error (`ERR_peek_error() != 0`),
+      // we fall through to the default error handling case.
+      if (ERR_peek_error() == 0) {
+        return Failure("TCP connection closed before SSL termination");
+      }
+    default: {
+      char buffer[1024] = {};
+      std::string error_strings;
+      while ((error = ERR_get_error())) {
+        ERR_error_string_n(error, buffer, sizeof(buffer));
+        error_strings += "\n" + stringify(buffer);
+      }
+      return Failure("Failed with error:" + error_strings);
+    }
+  };
+}
+
 } // namespace internal {
 } // namespace network {
 } // namespace process {
diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.hpp b/3rdparty/libprocess/src/ssl/openssl_socket.hpp
index a71e372..9fafcd7 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.hpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.hpp
@@ -13,6 +13,9 @@
 #ifndef __SSL_SOCKET_WRAPPER__
 #define __SSL_SOCKET_WRAPPER__
 
+#include <process/loop.hpp>
+#include <process/once.hpp>
+#include <process/queue.hpp>
 #include <process/socket.hpp>
 
 #include "poll_socket.hpp"
@@ -50,6 +53,37 @@ public:
   // of the socket is closed, then the futures of any outstanding read
   // requests will be completed (possibly as failures).
   Try<Nothing, SocketError> shutdown(int how) override;
+
+protected:
+  // Takes ownership of the given SSL object and performs an SSL handshake
+  // with the context of the SSL object. Either `SSL_set_connect_state`
+  // or `SSL_set_accept_state` must be called on the context beforehand,
+  // so that the handshake is done from the correct perspective.
+  Future<size_t> set_ssl_and_do_handshake(SSL* _ssl);
+
+  // Used to check the result of `SSL_do_handshake`, `SSL_read`,
+  // or `SSL_write` in a `process::loop`.
+  // `handle_as_read` should be set to `true` when this helper is called
+  // from `SSL_read` to handle the EOF event differently. Our socket
+  // API expects a return value of `0` when reading EOF, and a failure
+  // otherwise.
+  Future<ControlFlow<size_t>> handle_ssl_return_result(
+      int result,
+      bool handle_as_read);
+
+private:
+  SSL* ssl;
+  Option<net::IP> peer_ip;
+  Option<openssl::TLSClientConfig> client_config;
+
+  Once accept_loop_started;
+
+  // This queue stores accepted sockets that are considered connected
+  // (either the SSL handshake has completed or the socket has been
+  // downgraded). The 'accept()' call returns sockets from this queue.
+  // We wrap the socket in a 'Future' so that we can pass failures or
+  // discards through.
+  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
 };
 
 } // namespace internal {
diff --git a/3rdparty/libprocess/src/windows/poll_socket.cpp b/3rdparty/libprocess/src/windows/poll_socket.cpp
index 9c64886..4ddcb38 100644
--- a/3rdparty/libprocess/src/windows/poll_socket.cpp
+++ b/3rdparty/libprocess/src/windows/poll_socket.cpp
@@ -50,10 +50,9 @@ Try<Nothing> PollSocketImpl::listen(int backlog)
 
 Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
 {
-  // Need to hold a copy of `this` so that the underlying socket
-  // doesn't end up getting reused before we return from the call to
-  // `io::poll` and end up accepting a socket incorrectly.
-  auto self = shared(this);
+  // Need to hold a copy of `this` so that we can detect if the underlying
+  // socket has changed (i.e. closed) before we return from `io::poll`.
+  std::weak_ptr<SocketImpl> weak_self(shared(this));
 
   Try<Address> address = network::address(get());
   if (address.isError()) {
@@ -76,13 +75,18 @@ Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
 
   int_fd accept_socket = accept_socket_.get();
 
-  return windows::accept(self->get(), accept_socket)
+  return windows::accept(get(), accept_socket)
     .onAny([accept_socket](const Future<Nothing> future) {
       if (!future.isReady()) {
         os::close(accept_socket);
       }
     })
-    .then([self, accept_socket]() -> Future<std::shared_ptr<SocketImpl>> {
+    .then([weak_self, accept_socket]() -> Future<std::shared_ptr<SocketImpl>> {
+      std::shared_ptr<SocketImpl> self(weak_self.lock());
+      if (self == nullptr) {
+        return Failure("Socket destroyed while accepting");
+      }
+
       SOCKET listen = self->get();
 
       // Inherit from the listening socket.