You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/12/18 22:34:38 UTC

[mesos] 09/11: SSL Socket: Implemented send/recv and shutdown.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a827b1ee2ec1528e54514b38e01873970e7cfaf6
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Oct 23 18:01:35 2019 -0700

    SSL Socket: Implemented send/recv and shutdown.
    
    This completes a fully functional SSL socket,
    satisfying all the existing SSL socket tests in libprocess.
    
    Review: https://reviews.apache.org/r/71666
---
 3rdparty/libprocess/src/ssl/openssl_socket.cpp | 140 +++++++++++++++++++++++--
 3rdparty/libprocess/src/ssl/openssl_socket.hpp |   9 ++
 2 files changed, 141 insertions(+), 8 deletions(-)

diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
index e1d8ad3..42a1918 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
@@ -299,7 +299,8 @@ Try<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::create(int_fd socket)
 
 OpenSSLSocketImpl::OpenSSLSocketImpl(int_fd socket)
   : PollSocketImpl(socket),
-    ssl(nullptr) {}
+    ssl(nullptr),
+    dirty_shutdown(false) {}
 
 
 OpenSSLSocketImpl::~OpenSSLSocketImpl()
@@ -308,6 +309,11 @@ OpenSSLSocketImpl::~OpenSSLSocketImpl()
     SSL_free(ssl);
     ssl = nullptr;
   }
+
+  if (compute_thread.isSome()) {
+    process::terminate(compute_thread.get());
+    compute_thread = None();
+  }
 }
 
 
@@ -410,15 +416,74 @@ Future<Nothing> OpenSSLSocketImpl::connect(
 }
 
 
-Future<size_t> OpenSSLSocketImpl::recv(char* data, size_t size)
+Future<size_t> OpenSSLSocketImpl::recv(char* output, size_t size)
 {
-  UNIMPLEMENTED;
+  if (dirty_shutdown) {
+    return Failure("Socket is shutdown");
+  }
+
+  // Hold a weak pointer since the incoming socket is not guaranteed
+  // to terminate before the receiving end does.
+  std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+  return process::loop(
+      compute_thread,
+      [weak_self, output, size]() -> Future<int> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while receiving");
+        }
+
+        ERR_clear_error();
+        return SSL_read(self->ssl, output, size);
+      },
+      [weak_self](int result) -> Future<ControlFlow<size_t>> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while receiving");
+        }
+
+        if (result == 0) {
+          // Check if EOF has been reached.
+          BIO* bio = SSL_get_rbio(self->ssl);
+          if (BIO_eof(bio) == 1) {
+            return Break(0u);
+          }
+        }
+
+        return self->handle_ssl_return_result(result, true);
+      });
 }
 
 
-Future<size_t> OpenSSLSocketImpl::send(const char* data, size_t size)
+Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size)
 {
-  UNIMPLEMENTED;
+  if (dirty_shutdown) {
+    return Failure("Socket is shutdown");
+  }
+
+  // Hold a weak pointer since a write may become backlogged indefinitely.
+  std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+  return process::loop(
+      compute_thread,
+      [weak_self, input, size]() -> Future<int> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while sending");
+        }
+
+        ERR_clear_error();
+        return SSL_write(self->ssl, input, size);
+      },
+      [weak_self](int result) -> Future<ControlFlow<size_t>> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while sending");
+        }
+
+        return self->handle_ssl_return_result(result, false);
+      });
 }
 
 
@@ -442,7 +507,7 @@ Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept()
     // we do not wait for the accept logic to complete before accepting a
     // new socket.
     process::loop(
-        None(),
+        compute_thread,
         [weak_self]() -> Future<std::shared_ptr<SocketImpl>> {
           std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
 
@@ -571,12 +636,68 @@ Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept()
 
 Try<Nothing, SocketError> OpenSSLSocketImpl::shutdown(int how)
 {
-  UNIMPLEMENTED;
+  if (dirty_shutdown) {
+    return Nothing();
+  }
+
+  // Treat this as a dirty shutdown (i.e. closing the socket before sending
+  // the SSL close notification) because we are not guaranteed to properly
+  // shutdown synchronously.
+  dirty_shutdown = true;
+
+  // Hold a weak pointer since we are ok with closing the socket before
+  // shutdown is completed gracefully.
+  std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+  // The shutdown itself will happen asynchronously.
+  process::loop(
+      compute_thread,
+      [weak_self]() -> Future<int> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while doing shutdown");
+        }
+
+        ERR_clear_error();
+        return SSL_shutdown(self->ssl);
+      },
+      [weak_self](int result) -> Future<ControlFlow<size_t>> {
+        std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+        if (self == nullptr) {
+          return Failure("Socket destroyed while doing shutdown");
+        }
+
+        // A successful shutdown will return 0 if the close notification
+        // was sent, or 1 if both sides of the connection have closed.
+        // Either case is sufficient for a clean shutdown.
+        if (result >= 0) {
+          return Break(0u);
+        }
+
+        // Check if EOF has been reached.
+        BIO* bio = SSL_get_rbio(self->ssl);
+        if (BIO_eof(bio) == 1) {
+          return Break(0u);
+        }
+
+        return self->handle_ssl_return_result(result, false);
+      });
+
+  return Nothing();
 }
 
 
 Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl)
 {
+  // NOTE: We would normally create this UPID in the socket's constructor.
+  // However, during libprocess initialization, the libprocess listening socket
+  // is constructed before spawning processes is allowed.
+  // This function is guaranteed to be called for any SSL socket that may
+  // transmit encrypted data. Listening sockets will not create a UPID.
+  if (compute_thread.isNone()) {
+    compute_thread = spawn(new ProcessBase(), true);
+  }
+
   // Save a reference to the SSL object.
   ssl = _ssl;
 
@@ -591,7 +712,7 @@ Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl)
   std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
 
   return process::loop(
-      None(),
+      compute_thread,
       [weak_self]() -> Future<int> {
         std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
         if (self == nullptr) {
@@ -677,8 +798,11 @@ Future<ControlFlow<size_t>> OpenSSLSocketImpl::handle_ssl_return_result(
       // to any interaction with the OpenSSL library.
       UNREACHABLE();
     case SSL_ERROR_SSL:
+      dirty_shutdown = true;
       return Failure("Protocol error");
     case SSL_ERROR_SYSCALL:
+      dirty_shutdown = true;
+
       // NOTE: If there is an error (`ERR_peek_error() != 0`),
       // we fall through to the default error handling case.
       if (ERR_peek_error() == 0) {
diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.hpp b/3rdparty/libprocess/src/ssl/openssl_socket.hpp
index 9fafcd7..2d0259d 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.hpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.hpp
@@ -84,6 +84,15 @@ private:
   // We wrap the socket in a 'Future' so that we can pass failures or
   // discards through.
   Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
+
+  // Set to true whenever the connection is terminated before a proper
+  // SSL shutdown can be sent. This will also prevent `shutdown` from
+  // doing anything, as the connection will be presumed dead.
+  bool dirty_shutdown;
+
+  // An actor used to dispatch the compute-heavy work of encryption and
+  // decryption, like `SSL_read` and `SSL_write`.
+  Option<UPID> compute_thread;
 };
 
 } // namespace internal {