You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/12/18 22:34:38 UTC
[mesos] 09/11: SSL Socket: Implemented send/recv and shutdown.
This is an automated email from the ASF dual-hosted git repository.
josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a827b1ee2ec1528e54514b38e01873970e7cfaf6
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Oct 23 18:01:35 2019 -0700
SSL Socket: Implemented send/recv and shutdown.
This completes a fully functional SSL socket,
satisfying all the existing SSL socket tests in libprocess.
Review: https://reviews.apache.org/r/71666
---
3rdparty/libprocess/src/ssl/openssl_socket.cpp | 140 +++++++++++++++++++++++--
3rdparty/libprocess/src/ssl/openssl_socket.hpp | 9 ++
2 files changed, 141 insertions(+), 8 deletions(-)
diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
index e1d8ad3..42a1918 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
@@ -299,7 +299,8 @@ Try<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::create(int_fd socket)
OpenSSLSocketImpl::OpenSSLSocketImpl(int_fd socket)
: PollSocketImpl(socket),
- ssl(nullptr) {}
+ ssl(nullptr),
+ dirty_shutdown(false) {}
OpenSSLSocketImpl::~OpenSSLSocketImpl()
@@ -308,6 +309,11 @@ OpenSSLSocketImpl::~OpenSSLSocketImpl()
SSL_free(ssl);
ssl = nullptr;
}
+
+ if (compute_thread.isSome()) {
+ process::terminate(compute_thread.get());
+ compute_thread = None();
+ }
}
@@ -410,15 +416,74 @@ Future<Nothing> OpenSSLSocketImpl::connect(
}
-Future<size_t> OpenSSLSocketImpl::recv(char* data, size_t size)
+Future<size_t> OpenSSLSocketImpl::recv(char* output, size_t size)
{
- UNIMPLEMENTED;
+ if (dirty_shutdown) {
+ return Failure("Socket is shutdown");
+ }
+
+ // Hold a weak pointer since the incoming socket is not guaranteed
+ // to terminate before the receiving end does.
+ std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+ return process::loop(
+ compute_thread,
+ [weak_self, output, size]() -> Future<int> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while receiving");
+ }
+
+ ERR_clear_error();
+ return SSL_read(self->ssl, output, size);
+ },
+ [weak_self](int result) -> Future<ControlFlow<size_t>> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while receiving");
+ }
+
+ if (result == 0) {
+ // Check if EOF has been reached.
+ BIO* bio = SSL_get_rbio(self->ssl);
+ if (BIO_eof(bio) == 1) {
+ return Break(0u);
+ }
+ }
+
+ return self->handle_ssl_return_result(result, true);
+ });
}
-Future<size_t> OpenSSLSocketImpl::send(const char* data, size_t size)
+Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size)
{
- UNIMPLEMENTED;
+ if (dirty_shutdown) {
+ return Failure("Socket is shutdown");
+ }
+
+ // Hold a weak pointer since a write may become backlogged indefinitely.
+ std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+ return process::loop(
+ compute_thread,
+ [weak_self, input, size]() -> Future<int> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while sending");
+ }
+
+ ERR_clear_error();
+ return SSL_write(self->ssl, input, size);
+ },
+ [weak_self](int result) -> Future<ControlFlow<size_t>> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while sending");
+ }
+
+ return self->handle_ssl_return_result(result, false);
+ });
}
@@ -442,7 +507,7 @@ Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept()
// we do not wait for the accept logic to complete before accepting a
// new socket.
process::loop(
- None(),
+ compute_thread,
[weak_self]() -> Future<std::shared_ptr<SocketImpl>> {
std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
@@ -571,12 +636,68 @@ Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept()
Try<Nothing, SocketError> OpenSSLSocketImpl::shutdown(int how)
{
- UNIMPLEMENTED;
+ if (dirty_shutdown) {
+ return Nothing();
+ }
+
+ // Treat this as a dirty shutdown (i.e. closing the socket before sending
+ // the SSL close notification) because we are not guaranteed to properly
+ // shutdown synchronously.
+ dirty_shutdown = true;
+
+ // Hold a weak pointer since we are ok with closing the socket before
+ // shutdown is completed gracefully.
+ std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+ // The shutdown itself will happen asynchronously.
+ process::loop(
+ compute_thread,
+ [weak_self]() -> Future<int> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while doing shutdown");
+ }
+
+ ERR_clear_error();
+ return SSL_shutdown(self->ssl);
+ },
+ [weak_self](int result) -> Future<ControlFlow<size_t>> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while doing shutdown");
+ }
+
+ // A successful shutdown will return 0 if the close notification
+ // was sent, or 1 if both sides of the connection have closed.
+ // Either case is sufficient for a clean shutdown.
+ if (result >= 0) {
+ return Break(0u);
+ }
+
+ // Check if EOF has been reached.
+ BIO* bio = SSL_get_rbio(self->ssl);
+ if (BIO_eof(bio) == 1) {
+ return Break(0u);
+ }
+
+ return self->handle_ssl_return_result(result, false);
+ });
+
+ return Nothing();
}
Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl)
{
+ // NOTE: We would normally create this UPID in the socket's constructor.
+ // However, during libprocess initialization, the libprocess listening socket
+ // is constructed before spawning processes is allowed.
+ // This function is guaranteed to be called for any SSL socket that may
+ // transmit encrypted data. Listening sockets will not create a UPID.
+ if (compute_thread.isNone()) {
+ compute_thread = spawn(new ProcessBase(), true);
+ }
+
// Save a reference to the SSL object.
ssl = _ssl;
@@ -591,7 +712,7 @@ Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl)
std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
return process::loop(
- None(),
+ compute_thread,
[weak_self]() -> Future<int> {
std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
if (self == nullptr) {
@@ -677,8 +798,11 @@ Future<ControlFlow<size_t>> OpenSSLSocketImpl::handle_ssl_return_result(
// to any interaction with the OpenSSL library.
UNREACHABLE();
case SSL_ERROR_SSL:
+ dirty_shutdown = true;
return Failure("Protocol error");
case SSL_ERROR_SYSCALL:
+ dirty_shutdown = true;
+
// NOTE: If there is an error (`ERR_peek_error() != 0`),
// we fall through to the default error handling case.
if (ERR_peek_error() == 0) {
diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.hpp b/3rdparty/libprocess/src/ssl/openssl_socket.hpp
index 9fafcd7..2d0259d 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.hpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.hpp
@@ -84,6 +84,15 @@ private:
// We wrap the socket in a 'Future' so that we can pass failures or
// discards through.
Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
+
+ // Set to true whenever the connection is terminated before a proper
+ // SSL shutdown can be sent. This will also prevent `shutdown` from
+ // doing anything, as the connection will be presumed dead.
+ bool dirty_shutdown;
+
+ // An actor used to dispatch the compute-heavy work of encryption and
+ // decryption, like `SSL_read` and `SSL_write`.
+ Option<UPID> compute_thread;
};
} // namespace internal {