You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/30 15:47:43 UTC

mesos git commit: Support for SSL and non-SSL traffic simultaneously.

Repository: mesos
Updated Branches:
  refs/heads/master a9574b461 -> 37674a667


Support for SSL and non-SSL traffic simultaneously.

Add a flag SSL_SUPPORT_DOWNGRADE which allows:

1. An SSL accepting socket to peek at the incoming data. If the hello
   handshake bits are not set, then accept as a Socket::POLL socket
   instead.

2. When calling Process::link or Process:send(Message), if a new
   connection is required, allow a second attempt using Socket::POLL
   if an SSL connection was first attempted.

Review: https://reviews.apache.org/r/31207


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37674a66
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37674a66
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37674a66

Branch: refs/heads/master
Commit: 37674a667a57880daddb3dc49fedd4930edcf6cd
Parents: a9574b4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Jun 30 06:37:04 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Jun 30 06:47:32 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp  |   5 +
 3rdparty/libprocess/src/libevent_ssl_socket.cpp |  89 ++++++-
 3rdparty/libprocess/src/libevent_ssl_socket.hpp |  18 +-
 3rdparty/libprocess/src/openssl.cpp             |  14 ++
 3rdparty/libprocess/src/openssl.hpp             |   2 +
 3rdparty/libprocess/src/poll_socket.hpp         |   2 +
 3rdparty/libprocess/src/process.cpp             | 245 +++++++++++++++++--
 3rdparty/libprocess/src/tests/ssl_tests.cpp     | 171 +++++++++++++
 8 files changed, 519 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index f53d2e1..d79d2e4 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -37,6 +37,9 @@ public:
   // Returns the default kind of implementation of Socket.
   static const Kind& DEFAULT_KIND();
 
+  // The kind representing the underlying implementation.
+  Kind kind() const { return impl->kind(); }
+
   // Each socket is a reference counted, shared by default, concurrent
   // object. However, since we want to support multiple
   // implementations we use the Pimpl pattern (often called the
@@ -105,6 +108,8 @@ public:
       }
     }
 
+    virtual Socket::Kind kind() const = 0;
+
     // Construct a new Socket from the given impl. This is a proxy
     // function, as Impls derived from this won't have access to the
     // Socket::Socket(...) constructors.

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 2920e0e..583526b 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -776,7 +776,73 @@ Future<Socket> LibeventSSLSocketImpl::accept()
 }
 
 
-// Only runs in event loop.
+void LibeventSSLSocketImpl::peek_callback(
+    evutil_socket_t fd,
+    short what,
+    void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  CHECK(what & EV_READ);
+  char data[6];
+
+  // Try to peek the first 6 bytes of the message.
+  ssize_t size = ::recv(fd, data, 6, MSG_PEEK);
+
+  // Based on the function 'ssl23_get_client_hello' in openssl, we
+  // test whether to dispatch to the SSL or non-SSL based accept based
+  // on the following rules:
+  //   1. If there are fewer than 3 bytes: non-SSL.
+  //   2. If the 1st bit of the 1st byte is set AND the 3rd byte is
+  //          equal to SSL2_MT_CLIENT_HELLO: SSL.
+  //   3. If the 1st byte is equal to SSL3_RT_HANDSHAKE AND the 2nd
+  //      byte is equal to SSL3_VERSION_MAJOR and the 6th byte is
+  //      equal to SSL3_MT_CLIENT_HELLO: SSL.
+  //   4. Otherwise: non-SSL.
+
+  // For an ascii based protocol to falsely get dispatched to SSL it
+  // needs to:
+  //   1. Start with an invalid ascii character (0x80).
+  //   2. OR have the first 2 characters be a SYN followed by ETX, and
+  //          then the 6th character be SOH.
+  // These conditions clearly do not constitute valid HTTP requests,
+  // and are unlikely to collide with other existing protocols.
+
+  bool ssl = false; // Default to rule 4.
+
+  if (size < 2) { // Rule 1.
+    ssl = false;
+  } else if ((data[0] & 0x80) && data[2] == SSL2_MT_CLIENT_HELLO) { // Rule 2.
+    ssl = true;
+  } else if (data[0] == SSL3_RT_HANDSHAKE &&
+             data[1] == SSL3_VERSION_MAJOR &&
+             data[5] == SSL3_MT_CLIENT_HELLO) { // Rule 3.
+    ssl = true;
+  }
+
+  AcceptRequest* request = reinterpret_cast<AcceptRequest*>(arg);
+
+  // We call 'event_free()' here because it ensures the event is made
+  // non-pending and inactive before it gets deallocated.
+  event_free(request->peek_event);
+  request->peek_event = NULL;
+
+  if (ssl) {
+    accept_SSL_callback(request);
+  } else {
+    // Downgrade to a non-SSL socket.
+    Try<Socket> create = Socket::create(Socket::POLL, fd);
+    if (create.isError()) {
+      request->promise.fail(create.error());
+    } else {
+      request->promise.set(create.get());
+    }
+
+    delete request;
+  }
+}
+
+
 void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
 {
   CHECK(__in_event_loop__);
@@ -785,6 +851,27 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
   // verify.
   accept_queue.put(request->promise.future());
 
+  // If we support downgrading the connection, first wait for this
+  // socket to become readable. We will then MSG_PEEK it to test
+  // whether we want to dispatch as SSL or non-SSL.
+  if (openssl::flags().support_downgrade) {
+    request->peek_event = event_new(
+        base,
+        request->socket,
+        EV_READ,
+        &LibeventSSLSocketImpl::peek_callback,
+        request);
+    event_add(request->peek_event, NULL);
+  } else {
+    accept_SSL_callback(request);
+  }
+}
+
+
+void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
+{
+  CHECK(__in_event_loop__);
+
   // Set up SSL object.
   SSL* ssl = SSL_new(openssl::context());
   if (ssl == NULL) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index 4f2cd35..87c7835 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -57,9 +57,11 @@ private:
         int _socket,
         evconnlistener* _listener,
         const Option<net::IP>& _ip)
-      : listener(_listener),
+      : peek_event(NULL),
+        listener(_listener),
         socket(_socket),
         ip(_ip) {}
+    event* peek_event;
     Promise<Socket> promise;
     evconnlistener* listener;
     int socket;
@@ -96,9 +98,21 @@ private:
       Option<std::string>&& peer_hostname);
 
   // This is called when the equivalent of 'accept' returns. The role
-  // of this function is to set up the SSL object and bev.
+  // of this function is to set up the SSL object and bev. If we
+  // support both SSL and non-SSL traffic simultaneously then we first
+  // wait for data to be ready and test the hello handshake to
+  // disambiguate between the kinds of traffic.
   void accept_callback(AcceptRequest* request);
 
+  // This is the continuation of 'accept_callback' that handles an SSL
+  // connection.
+  static void accept_SSL_callback(AcceptRequest* request);
+
+  // This function peeks at the data on an accepted socket to see if
+  // there is an SSL handshake or not. It then dispatches to the
+  // SSL handling function or creates a non-SSL socket.
+  static void peek_callback(evutil_socket_t fd, short what, void* arg);
+
   // The following are function pairs of static functions to member
   // functions. The static functions test and hold the weak pointer to
   // the socket before calling the member functions. This protects

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/openssl.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.cpp b/3rdparty/libprocess/src/openssl.cpp
index 6ff4adb..8ffc409 100644
--- a/3rdparty/libprocess/src/openssl.cpp
+++ b/3rdparty/libprocess/src/openssl.cpp
@@ -41,6 +41,13 @@ Flags::Flags()
       "Whether SSL is enabled.",
       false);
 
+  add(&Flags::support_downgrade,
+      "support_downgrade",
+      "Enable downgrading SSL accepting sockets to non-SSL traffic. When this "
+      "is enabled, no protocol may be used on non-SSL connections that "
+      "conflics with the protocol headers for SSL.",
+      false);
+
   add(&Flags::cert_file,
       "cert_file",
       "Path to certifcate.");
@@ -323,6 +330,13 @@ void reinitialize()
     LOG(FATAL) << "Session id context size exceeds maximum";
   }
 
+  // Notify users of the 'SSL_SUPPORT_DOWNGRADE' flag that this
+  // setting allows insecure connections.
+  if (ssl_flags->support_downgrade) {
+    LOG(WARNING) <<
+      "Failed SSL connections will be downgraded to a non-SSL socket";
+  }
+
   // Now do some validation of the flags/environment variables.
   if (ssl_flags->key_file.isNone()) {
     EXIT(EXIT_FAILURE) << "SSL requires key! NOTE: Set path with SSL_KEY_FILE";

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/openssl.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.hpp b/3rdparty/libprocess/src/openssl.hpp
index 60c7b07..16f3d56 100644
--- a/3rdparty/libprocess/src/openssl.hpp
+++ b/3rdparty/libprocess/src/openssl.hpp
@@ -22,6 +22,7 @@ public:
   Flags();
 
   bool enabled;
+  bool support_downgrade;
   Option<std::string> cert_file;
   Option<std::string> key_file;
   bool verify_cert;
@@ -44,6 +45,7 @@ const Flags& flags();
 // context gets initialized using the environment variables:
 //
 //    SSL_ENABLED=(false|0,true|1)
+//    SSL_SUPPORT_DOWNGRADE=(false|0,true|1)
 //    SSL_CERT_FILE=(path to certificate)
 //    SSL_KEY_FILE=(path to key)
 //    SSL_VERIFY_CERT=(false|0,true|1)

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index 553aa64..a14d63f 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -23,6 +23,8 @@ public:
   virtual Future<size_t> recv(char* data, size_t size);
   virtual Future<size_t> send(const char* data, size_t size);
   virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+
+  virtual Socket::Kind kind() const { return Socket::POLL; }
 };
 
 } // namespace network {

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 52649fb..d99947c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -85,6 +85,9 @@
 #include "encoder.hpp"
 #include "event_loop.hpp"
 #include "gate.hpp"
+#ifdef USE_SSL_SOCKET
+#include "openssl.hpp"
+#endif
 #include "process_reference.hpp"
 
 using namespace process::firewall;
@@ -267,7 +270,9 @@ public:
 
   void accepted(const Socket& socket);
 
-  void link(ProcessBase* process, const UPID& to);
+  void link(ProcessBase* process,
+            const UPID& to,
+            const Socket::Kind& kind = Socket::DEFAULT_KIND());
 
   PID<HttpProxy> proxy(const Socket& socket);
 
@@ -275,7 +280,8 @@ public:
   void send(const Response& response,
             const Request& request,
             const Socket& socket);
-  void send(Message* message);
+  void send(Message* message,
+            const Socket::Kind& kind = Socket::DEFAULT_KIND());
 
   Encoder* next(int s);
 
@@ -299,6 +305,24 @@ private:
     hashmap<Address, hashset<UPID>> remotes;
   } links;
 
+  // Switch the underlying socket that a remote end is talking to.
+  // This manipulates the datastructures below by swapping all data
+  // mapped to 'from' to being mapped to 'to'. This is useful for
+  // downgrading a socket from SSL to POLL based.
+  void swap_implementing_socket(const Socket& from, Socket* to);
+
+  // Helper function for link().
+  void link_connect(
+      const Future<Nothing>& future,
+      Socket* socket,
+      const UPID& to);
+
+  // Helper function for send().
+  void send_connect(
+      const Future<Nothing>& future,
+      Socket* socket,
+      Message* message);
+
   // Collection of all actice sockets.
   map<int, Socket*> sockets;
 
@@ -757,6 +781,15 @@ void initialize(const string& delegate)
   signal(SIGPIPE, SIG_IGN);
 #endif // __sun__
 
+#ifdef USE_SSL_SOCKET
+  // Notify users of the 'SSL_SUPPORT_DOWNGRADE' flag that this
+  // setting allows insecure connections.
+  if (network::openssl::flags().support_downgrade) {
+    LOG(WARNING) <<
+      "Failed SSL connections will be downgraded to a non-SSL socket";
+  }
+#endif
+
   // Create a new ProcessManager and SocketManager.
   process_manager = new ProcessManager(delegate);
   socket_manager = new SocketManager();
@@ -1233,14 +1266,71 @@ void ignore_recv_data(
 void send(Encoder* encoder, Socket* socket);
 
 
-void link_connect(const Future<Nothing>& future, Socket* socket)
+} // namespace internal {
+
+
+void SocketManager::link_connect(
+    const Future<Nothing>& future,
+    Socket* socket,
+    const UPID& to)
 {
   if (future.isDiscarded() || future.isFailed()) {
     if (future.isFailed()) {
       VLOG(1) << "Failed to link, connect: " << future.failure();
     }
+
+    // Check if SSL is enabled, and whether we allow a downgrade to
+    // non-SSL traffic.
+#ifdef USE_SSL_SOCKET
+    bool attempt_downgrade =
+      future.isFailed() &&
+      network::openssl::flags().enabled &&
+      network::openssl::flags().support_downgrade &&
+      socket->kind() == Socket::SSL;
+
+    Option<Socket> poll_socket = None();
+
+    // If we allow downgrading from SSL to non-SSL, then retry as a
+    // POLL socket.
+    if (attempt_downgrade) {
+      synchronized (mutex) {
+        Try<Socket> create = Socket::create(Socket::POLL);
+        if (create.isError()) {
+          VLOG(1) << "Failed to link, create socket: " << create.error();
+          socket_manager->close(*socket);
+          delete socket;
+          return;
+        }
+
+        poll_socket = create.get();
+
+        // Update all the datastructures that are mapped to the socket
+        // that just failed to connect. They will now point to the new
+        // POLL socket we are about to try to connect. Even if the
+        // process has exited, persistent links will stay around, and
+        // temporary links will get cleaned up as they would otherwise.
+        swap_implementing_socket(*socket, new Socket(poll_socket.get()));
+      }
+
+      CHECK_SOME(poll_socket);
+      poll_socket.get().connect(to.address)
+        .onAny(lambda::bind(
+            &SocketManager::link_connect,
+            this,
+            lambda::_1,
+            new Socket(poll_socket.get()),
+            to));
+
+      // We don't need to 'shutdown()' the socket as it was never
+      // connected.
+      delete socket;
+      return;
+    }
+#endif
+
     socket_manager->close(*socket);
     delete socket;
+
     return;
   }
 
@@ -1249,7 +1339,7 @@ void link_connect(const Future<Nothing>& future, Socket* socket)
 
   socket->recv(data, size)
     .onAny(lambda::bind(
-        &ignore_recv_data,
+        &internal::ignore_recv_data,
         lambda::_1,
         socket,
         data,
@@ -1269,15 +1359,15 @@ void link_connect(const Future<Nothing>& future, Socket* socket)
   Encoder* encoder = socket_manager->next(*socket);
 
   if (encoder != NULL) {
-    send(encoder, new Socket(*socket));
+    internal::send(encoder, new Socket(*socket));
   }
 }
 
 
-} // namespace internal {
-
-
-void SocketManager::link(ProcessBase* process, const UPID& to)
+void SocketManager::link(
+    ProcessBase* process,
+    const UPID& to,
+    const Socket::Kind& kind)
 {
   // TODO(benh): The semantics we want to support for link are such
   // that if there is nobody to link to (local or remote) then an
@@ -1287,7 +1377,7 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
   // work remotely ... but if there is someone listening remotely just
   // not at that id, then it will silently continue executing.
 
-  CHECK(process != NULL);
+  CHECK_NOTNULL(process);
 
   Option<Socket> socket = None();
   bool connect = false;
@@ -1296,7 +1386,10 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
     // Check if the socket address is remote and there isn't a persistant link.
     if (to.address != __address__  && persists.count(to.address) == 0) {
       // Okay, no link, let's create a socket.
-      Try<Socket> create = Socket::create();
+      // The kind of socket we create is passed in as an argument.
+      // This allows us to support downgrading the connection type
+      // from SSL to POLL if enabled.
+      Try<Socket> create = Socket::create(kind);
       if (create.isError()) {
         VLOG(1) << "Failed to link, create socket: " << create.error();
         return;
@@ -1330,9 +1423,11 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
     CHECK_SOME(socket);
     Socket(socket.get()).connect(to.address) // Copy to drop const.
       .onAny(lambda::bind(
-          &internal::link_connect,
+          &SocketManager::link_connect,
+          this,
           lambda::_1,
-          new Socket(socket.get())));
+          new Socket(socket.get()),
+          to));
   }
 }
 
@@ -1496,9 +1591,7 @@ void SocketManager::send(
 }
 
 
-namespace internal {
-
-void send_connect(
+void SocketManager::send_connect(
     const Future<Nothing>& future,
     Socket* socket,
     Message* message)
@@ -1508,8 +1601,60 @@ void send_connect(
       VLOG(1) << "Failed to send '" << message->name << "' to '"
               << message->to.address << "', connect: " << future.failure();
     }
+
+    // Check if SSL is enabled, and whether we allow a downgrade to
+    // non-SSL traffic.
+#ifdef USE_SSL_SOCKET
+    bool attempt_downgrade =
+      future.isFailed() &&
+      network::openssl::flags().enabled &&
+      network::openssl::flags().support_downgrade &&
+      socket->kind() == Socket::SSL;
+
+    Option<Socket> poll_socket = None();
+
+    // If we allow downgrading from SSL to non-SSL, then retry as a
+    // POLL socket.
+    if (attempt_downgrade) {
+      synchronized (mutex) {
+        Try<Socket> create = Socket::create(Socket::POLL);
+        if (create.isError()) {
+          VLOG(1) << "Failed to link, create socket: " << create.error();
+          socket_manager->close(*socket);
+          delete message;
+          delete socket;
+          return;
+        }
+
+        poll_socket = create.get();
+
+        // Update all the datastructures that are mapped to the socket
+        // that just failed to connect. They will now point to the new
+        // POLL socket we are about to try to connect. Even if the
+        // process has exited, persistent links will stay around, and
+        // temporary links will get cleaned up as they would otherwise.
+        swap_implementing_socket(*socket, new Socket(poll_socket.get()));
+      }
+
+      CHECK_SOME(poll_socket);
+      poll_socket.get().connect(message->to.address)
+        .onAny(lambda::bind(
+            &SocketManager::send_connect,
+            this,
+            lambda::_1,
+            new Socket(poll_socket.get()),
+            message));
+
+      // We don't need to 'shutdown()' the socket as it was never
+      // connected.
+      delete socket;
+      return;
+    }
+#endif
+
     socket_manager->close(*socket);
     delete socket;
+
     delete message;
     return;
   }
@@ -1524,7 +1669,7 @@ void send_connect(
 
   socket->recv(data, size)
     .onAny(lambda::bind(
-        &ignore_recv_data,
+        &internal::ignore_recv_data,
         lambda::_1,
         new Socket(*socket),
         data,
@@ -1533,10 +1678,8 @@ void send_connect(
   internal::send(encoder, socket);
 }
 
-} // namespace internal {
-
 
-void SocketManager::send(Message* message)
+void SocketManager::send(Message* message, const Socket::Kind& kind)
 {
   CHECK(message != NULL);
 
@@ -1569,9 +1712,12 @@ void SocketManager::send(Message* message)
       }
 
     } else {
-      // No peristent or temporary socket to the socket address
+      // No persistent or temporary socket to the socket address
       // currently exists, so we create a temporary one.
-      Try<Socket> create = Socket::create();
+      // The kind of socket we create is passed in as an argument.
+      // This allows us to support downgrading the connection type
+      // from SSL to POLL if enabled.
+      Try<Socket> create = Socket::create(kind);
       if (create.isError()) {
         VLOG(1) << "Failed to send, create socket: " << create.error();
         delete message;
@@ -1595,9 +1741,10 @@ void SocketManager::send(Message* message)
 
   if (connect) {
     CHECK_SOME(socket);
-    Socket(socket.get()).connect(address) // Copy to drop const.
+    socket.get().connect(address)
       .onAny(lambda::bind(
-          &internal::send_connect,
+          &SocketManager::send_connect,
+          this,
           lambda::_1,
           new Socket(socket.get()),
           message));
@@ -1882,6 +2029,56 @@ void SocketManager::exited(ProcessBase* process)
 }
 
 
+void SocketManager::swap_implementing_socket(const Socket& from, Socket* to)
+{
+  const int from_fd = from.get();
+  const int to_fd = to->get();
+
+  // Make sure the 'from' and 'to' are valid to swap.
+  CHECK(sockets.count(from_fd) > 0);
+  CHECK(sockets.count(to_fd) == 0);
+
+  synchronized (mutex) {
+    sockets.erase(from_fd);
+    sockets[to_fd] = to;
+
+    // Update the dispose set if this is a temporary link.
+    if (dispose.count(from_fd) > 0) {
+      dispose.insert(to_fd);
+      dispose.erase(from_fd);
+    }
+
+    // Update the fd that this address is associated with. Once we've
+    // done this we can update the 'temps' and 'persists'
+    // datastructures using this updated address.
+    addresses[to_fd] = addresses[from_fd];
+    addresses.erase(from_fd);
+
+    // If this address is a temporary link.
+    if (temps.count(addresses[to_fd]) > 0) {
+      temps[addresses[to_fd]] = to_fd;
+      // No need to erase as we're changing the value, not the key.
+    }
+
+    // If this address is a persistent link.
+    if (persists.count(addresses[to_fd]) > 0) {
+      persists[addresses[to_fd]] = to_fd;
+      // No need to erase as we're changing the value, not the key.
+    }
+
+    // Move any encoders queued against this link to the new socket.
+    outgoing[to_fd] = std::move(outgoing[from_fd]);
+    outgoing.erase(from_fd);
+
+    // Update the fd any proxies are associated with.
+    if (proxies.count(from_fd) > 0) {
+      proxies[to_fd] = proxies[from_fd];
+      proxies.erase(from_fd);
+    }
+  }
+}
+
+
 ProcessManager::ProcessManager(const string& _delegate)
   : delegate(_delegate)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/37674a66/3rdparty/libprocess/src/tests/ssl_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_tests.cpp b/3rdparty/libprocess/src/tests/ssl_tests.cpp
index c077aae..e6917c1 100644
--- a/3rdparty/libprocess/src/tests/ssl_tests.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_tests.cpp
@@ -705,4 +705,175 @@ TEST_F(SSLTest, ProtocolMismatch)
   }
 }
 
+
+// Ensure we can communicate between a POLL based socket and an SSL
+// socket if 'SSL_SUPPORT_DOWNGRADE' is enabled.
+TEST_F(SSLTest, ValidDowngrade)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_SUPPORT_DOWNGRADE", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value},
+      {"SSL_REQUIRE_CERT", "true"}});
+  ASSERT_SOME(server);
+
+  Try<Subprocess> client = launch_client({
+      {"SSL_ENABLED", "false"}},
+      server.get(),
+      false);
+  ASSERT_SOME(client);
+
+  Future<Socket> socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  // TODO(jmlvanre): Remove const copy.
+  AWAIT_ASSERT_EQ(data, Socket(socket.get()).recv());
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(data));
+
+  AWAIT_ASSERT_READY(await_subprocess(client.get(), 0));
+}
+
+
+// Ensure we can NOT communicate between a POLL based socket and an
+// SSL socket if 'SSL_SUPPORT_DOWNGRADE' is not enabled.
+TEST_F(SSLTest, NoValidDowngrade)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_SUPPORT_DOWNGRADE", "false"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value},
+      {"SSL_REQUIRE_CERT", "true"}});
+  ASSERT_SOME(server);
+
+  Try<Subprocess> client = launch_client({
+      {"SSL_ENABLED", "false"}},
+      server.get(),
+      false);
+  ASSERT_SOME(client);
+
+  Future<Socket> socket = server.get().accept();
+  AWAIT_ASSERT_FAILED(socket);
+
+  AWAIT_ASSERT_READY(await_subprocess(client.get(), None()));
+}
+
+
+// For each protocol: ensure we can communicate between a POLL based
+// socket and an SSL socket if 'SSL_SUPPORT_DOWNGRADE' is enabled.
+TEST_F(SSLTest, ValidDowngradeEachProtocol)
+{
+  const vector<string> protocols = {
+    // Openssl can be compiled with SSLV2 and/or SSLV3 disabled
+    // completely, so we conditionally test these protocol.
+#ifndef OPENSSL_NO_SSL2
+    "SSL_ENABLE_SSL_V2",
+#endif
+#ifndef OPENSSL_NO_SSL3
+    "SSL_ENABLE_SSL_V3",
+#endif
+    "SSL_ENABLE_TLS_V1_0",
+    "SSL_ENABLE_TLS_V1_1",
+    "SSL_ENABLE_TLS_V1_2"
+  };
+
+  // For each protocol.
+  foreach (const string& server_protocol, protocols) {
+    LOG(INFO) << "Testing server protocol '" << server_protocol << "'\n";
+
+    // Set up the default server environment variables.
+    map<string, string> server_environment = {
+      {"SSL_ENABLED", "true"},
+      {"SSL_SUPPORT_DOWNGRADE", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}
+    };
+
+    // Disable all protocols except for the one we're testing.
+    foreach (const string& protocol, protocols) {
+      server_environment.emplace(
+          protocol,
+          stringify(protocol == server_protocol));
+    }
+
+    // Set up the server.
+    Try<Socket> server = setup_server(server_environment);
+    ASSERT_SOME(server);
+
+    // Launch the client with a POLL socket.
+    Try<Subprocess> client = launch_client({
+        {"SSL_ENABLED", "false"}},
+        server.get(),
+        false);
+    ASSERT_SOME(client);
+
+    Future<Socket> socket = server.get().accept();
+    AWAIT_ASSERT_READY(socket);
+
+    // TODO(jmlvanre): Remove const copy.
+    AWAIT_ASSERT_EQ(data, Socket(socket.get()).recv());
+    AWAIT_ASSERT_READY(Socket(socket.get()).send(data));
+
+    AWAIT_ASSERT_READY(await_subprocess(client.get(), 0));
+  }
+}
+
+
+// For each protocol: ensure we can NOT communicate between a POLL
+// based socket and an SSL socket if 'SSL_SUPPORT_DOWNGRADE' is not
+// enabled.
+TEST_F(SSLTest, NoValidDowngradeEachProtocol)
+{
+  const vector<string> protocols = {
+    // Openssl can be compiled with SSLV2 and/or SSLV3 disabled
+    // completely, so we conditionally test these protocol.
+#ifndef OPENSSL_NO_SSL2
+    "SSL_ENABLE_SSL_V2",
+#endif
+#ifndef OPENSSL_NO_SSL3
+    "SSL_ENABLE_SSL_V3",
+#endif
+    "SSL_ENABLE_TLS_V1_0",
+    "SSL_ENABLE_TLS_V1_1",
+    "SSL_ENABLE_TLS_V1_2"
+  };
+
+  // For each protocol.
+  foreach (const string& server_protocol, protocols) {
+    LOG(INFO) << "Testing server protocol '" << server_protocol << "'\n";
+
+    // Set up the default server environment variables.
+    map<string, string> server_environment = {
+      {"SSL_ENABLED", "true"},
+      {"SSL_SUPPORT_DOWNGRADE", "false"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}
+    };
+
+    // Disable all protocols except for the one we're testing.
+    foreach (const string& protocol, protocols) {
+      server_environment.emplace(
+          protocol,
+          stringify(protocol == server_protocol));
+    }
+
+    // Set up the server.
+    Try<Socket> server = setup_server(server_environment);
+    ASSERT_SOME(server);
+
+    // Launch the client with a POLL socket.
+    Try<Subprocess> client = launch_client({
+        {"SSL_ENABLED", "false"}},
+        server.get(),
+        false);
+    ASSERT_SOME(client);
+
+    Future<Socket> socket = server.get().accept();
+    AWAIT_ASSERT_FAILED(socket);
+
+    AWAIT_ASSERT_READY(await_subprocess(client.get(), None()));
+  }
+}
+
 #endif // USE_SSL_SOCKET