You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/11/29 21:29:34 UTC

[01/14] mesos git commit: Changes in Mesos to make http::Request::client optional.

Repository: mesos
Updated Branches:
  refs/heads/master a85e28401 -> 913efc85d


Changes in Mesos to make http::Request::client optional.

Review: https://reviews.apache.org/r/54109


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d0805e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d0805e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d0805e9

Branch: refs/heads/master
Commit: 0d0805e914bc50717237e1246097af1d3b7ba92a
Parents: fcf9f20
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Nov 5 00:08:40 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 src/master/http.cpp | 4 +++-
 src/slave/http.cpp  | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0d0805e9/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 90cbed1..ac560d1 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -389,7 +389,9 @@ void Master::Http::log(const Request& request)
   Option<string> forwardedFor = request.headers.get("X-Forwarded-For");
 
   LOG(INFO) << "HTTP " << request.method << " for " << request.url.path
-            << " from " << request.client
+            << (request.client.isSome()
+                ? " from " + stringify(request.client.get())
+                : "")
             << (userAgent.isSome()
                 ? " with User-Agent='" + userAgent.get() + "'"
                 : "")

http://git-wip-us.apache.org/repos/asf/mesos/blob/0d0805e9/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 91bb882..87189dd 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -283,7 +283,9 @@ void Slave::Http::log(const Request& request)
   Option<string> forwardedFor = request.headers.get("X-Forwarded-For");
 
   LOG(INFO) << "HTTP " << request.method << " for " << request.url.path
-            << " from " << request.client
+            << (request.client.isSome()
+                ? " from " + stringify(request.client.get())
+                : "")
             << (userAgent.isSome()
                 ? " with User-Agent='" + userAgent.get() + "'"
                 : "")


[03/14] mesos git commit: Updated Socket::Impl::accept to return std::shared_ptr.

Posted by be...@apache.org.
Updated Socket::Impl::accept to return std::shared_ptr<Impl>.

Review: https://reviews.apache.org/r/53457


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c093c8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c093c8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c093c8c

Branch: refs/heads/master
Commit: 5c093c8c3b8e1f47cc501d85170272056010c616
Parents: a85e284
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Nov 1 22:41:40 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  8 ++++++--
 3rdparty/libprocess/src/poll_socket.cpp        | 21 ++++++++++++---------
 3rdparty/libprocess/src/poll_socket.hpp        |  2 +-
 3 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5c093c8c/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index f798af7..97bdcb9 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -148,7 +148,7 @@ public:
      * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
      * configurable by the caller of the interface.
      */
-    virtual Future<Socket> accept() = 0;
+    virtual Future<std::shared_ptr<Impl>> accept() = 0;
 
     virtual Future<Nothing> connect(const Address& address) = 0;
     virtual Future<size_t> recv(char* data, size_t size) = 0;
@@ -296,7 +296,11 @@ public:
 
   Future<Socket> accept()
   {
-    return impl->accept();
+    return impl->accept()
+      // TODO(benh): Use && for `impl` here!
+      .then([](const std::shared_ptr<Impl>& impl) {
+        return Socket(impl);
+      });
   }
 
   Future<Nothing> connect(const Address& address)

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c093c8c/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index eb7b487..d04f048 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -50,7 +50,7 @@ Try<Nothing> PollSocketImpl::listen(int backlog)
 
 namespace internal {
 
-Future<Socket> accept(int fd)
+Future<int> accept(int fd)
 {
   Try<int> accepted = network::accept(fd);
   if (accepted.isError()) {
@@ -91,21 +91,24 @@ Future<Socket> accept(int fd)
       "Failed to turn off the Nagle algorithm: " + stringify(error));
   }
 
-  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
-  if (socket.isError()) {
-    os::close(s);
-    return Failure("Failed to accept, create socket: " + socket.error());
-  }
-  return socket.get();
+  return s;
 }
 
 } // namespace internal {
 
 
-Future<Socket> PollSocketImpl::accept()
+Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
 {
   return io::poll(get(), io::READ)
-    .then(lambda::bind(&internal::accept, get()));
+    .then(lambda::bind(&internal::accept, get()))
+    .then([](int s) -> Future<std::shared_ptr<Socket::Impl>> {
+      Try<std::shared_ptr<Socket::Impl>> impl = create(s);
+      if (impl.isError()) {
+        os::close(s);
+        return Failure("Failed to create socket: " + impl.error());
+      }
+      return impl.get();
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c093c8c/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index d04f3f2..3ba3678 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -30,7 +30,7 @@ public:
 
   // Implementation of the Socket::Impl interface.
   virtual Try<Nothing> listen(int backlog);
-  virtual Future<Socket> accept();
+  virtual Future<std::shared_ptr<Socket::Impl>> accept();
   virtual Future<Nothing> connect(const Address& address);
   virtual Future<size_t> recv(char* data, size_t size);
   virtual Future<size_t> send(const char* data, size_t size);


[14/14] mesos git commit: Updated http::Connection::disconnect to do a complete socket shutdown.

Posted by be...@apache.org.
Updated http::Connection::disconnect to do a complete socket shutdown.

Review: https://reviews.apache.org/r/54114


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/913efc85
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/913efc85
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/913efc85

Branch: refs/heads/master
Commit: 913efc85da4632c56ee376bb403c1761bb4945ee
Parents: 4417a4e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 14:25:50 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:19:17 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/http.cpp | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/913efc85/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 03512c6..8a10dca 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1084,9 +1084,8 @@ public:
 
   Future<Nothing> disconnect(const Option<string>& message = None())
   {
-    Try<Nothing> shutdown = socket.shutdown();
-
-    disconnection.set(Nothing());
+    Try<Nothing> shutdown = socket.shutdown(
+        network::Socket::Shutdown::READ_WRITE);
 
     // If a response is still streaming, we send EOF to
     // the decoder in order to fail the pipe reader.
@@ -1101,6 +1100,8 @@ public:
       pipeline.pop();
     }
 
+    disconnection.set(Nothing());
+
     return shutdown;
   }
 


[02/14] mesos git commit: Changes in libprocess to make http::Request::client optional.

Posted by be...@apache.org.
Changes in libprocess to make http::Request::client optional.

Review: https://reviews.apache.org/r/54108


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fcf9f208
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fcf9f208
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fcf9f208

Branch: refs/heads/master
Commit: fcf9f2082fc04323a74472eecfadaec325440af3
Parents: b64c9d4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Nov 5 00:08:22 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 2 +-
 3rdparty/libprocess/src/tests/http_tests.cpp | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fcf9f208/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index d464b9f..08a8390 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -447,7 +447,7 @@ struct Request
 
   // For server requests, this contains the address of the client.
   // Note that this may correspond to a proxy or load balancer address.
-  network::inet::Address client;
+  Option<network::Address> client;
 
   // Clients can choose to provide the entire body at once
   // via BODY or can choose to stream the body over to the

http://git-wip-us.apache.org/repos/asf/mesos/blob/fcf9f208/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 22ec432..de22d20 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -549,7 +549,7 @@ TEST(HTTPTest, PathParse)
 
 http::Response validateGetWithoutQuery(const http::Request& request)
 {
-  EXPECT_NE(process::address(), request.client);
+  EXPECT_SOME_NE(process::address(), request.client);
   EXPECT_EQ("GET", request.method);
   EXPECT_THAT(request.url.path, EndsWith("get"));
   EXPECT_EQ("", request.body);
@@ -562,7 +562,7 @@ http::Response validateGetWithoutQuery(const http::Request& request)
 
 http::Response validateGetWithQuery(const http::Request& request)
 {
-  EXPECT_NE(process::address(), request.client);
+  EXPECT_SOME_NE(process::address(), request.client);
   EXPECT_EQ("GET", request.method);
   EXPECT_THAT(request.url.path, EndsWith("get"));
   EXPECT_EQ("", request.body);


[12/14] mesos git commit: Added POSIX socket shutdown types to Windows header.

Posted by be...@apache.org.
Added POSIX socket shutdown types to Windows header.

This patch adds two missing BSD socket shutdown types
to a Windows header which maps these POSIX constants
to their Windows counterparts.

Review: https://reviews.apache.org/r/53990/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3013ffb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3013ffb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3013ffb

Branch: refs/heads/master
Commit: b3013ffb0711ad70f91b50e90ea17674137c9c15
Parents: 9a4b5e6
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Nov 29 12:18:14 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:19:17 2016 -0800

----------------------------------------------------------------------
 3rdparty/stout/include/stout/windows.hpp | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3013ffb/3rdparty/stout/include/stout/windows.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/windows.hpp b/3rdparty/stout/include/stout/windows.hpp
index 3782aa0..c384026 100644
--- a/3rdparty/stout/include/stout/windows.hpp
+++ b/3rdparty/stout/include/stout/windows.hpp
@@ -171,6 +171,8 @@ typedef SSIZE_T ssize_t;
 // the Windows versions of these flags to their POSIX equivalents so we don't
 // have to change any socket code.
 constexpr int SHUT_RD = SD_RECEIVE;
+constexpr int SHUT_WR = SD_SEND;
+constexpr int SHUT_RDWR = SD_BOTH;
 constexpr int MSG_NOSIGNAL = 0; // `SIGPIPE` signal does not exist on Windows.
 
 // The following functions are usually macros on POSIX; we provide them here as


[07/14] mesos git commit: Updated usage of network::Address and network::Socket in Mesos.

Posted by be...@apache.org.
Updated usage of network::Address and network::Socket in Mesos.

Review: https://reviews.apache.org/r/53462


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b64c9d42
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b64c9d42
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b64c9d42

Branch: refs/heads/master
Commit: b64c9d42c114ee81e0e83d96206a8ae1a664464c
Parents: 2529d07
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Nov 3 23:39:07 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 src/tests/fetcher_tests.cpp |  4 ++--
 src/tests/utils.cpp         | 21 ++++++++++-----------
 2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b64c9d42/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index 79526ae..8b8baf8 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -413,7 +413,7 @@ TEST_F(FetcherTest, OSNetUriTest)
 {
   Http http;
 
-  const network::Address& address = http.process->self().address;
+  const network::inet::Address& address = http.process->self().address;
 
   process::http::URL url(
       "http",
@@ -458,7 +458,7 @@ TEST_F(FetcherTest, OSNetUriSpaceTest)
 {
   Http http;
 
-  const network::Address& address = http.process->self().address;
+  const network::inet::Address& address = http.process->self().address;
 
   process::http::URL url(
       "http",

http://git-wip-us.apache.org/repos/asf/mesos/blob/b64c9d42/src/tests/utils.cpp
----------------------------------------------------------------------
diff --git a/src/tests/utils.cpp b/src/tests/utils.cpp
index d36aa9c..0a9e5a8 100644
--- a/src/tests/utils.cpp
+++ b/src/tests/utils.cpp
@@ -32,14 +32,13 @@
 
 #include "tests/flags.hpp"
 
+namespace http = process::http;
+namespace inet = process::network::inet;
+
 using std::string;
 
 using process::Future;
 using process::UPID;
-using process::address;
-
-namespace http = process::http;
-namespace network = process::network;
 
 namespace mesos {
 namespace internal {
@@ -53,7 +52,7 @@ const bool searchInstallationDirectory = false;
 
 JSON::Object Metrics()
 {
-  UPID upid("metrics", address());
+  UPID upid("metrics", process::address());
 
   // TODO(neilc): This request might timeout if the current value of a
   // metric cannot be determined. In tests, a common cause for this is
@@ -73,20 +72,20 @@ JSON::Object Metrics()
 Try<uint16_t> getFreePort()
 {
   // Bind to port=0 to obtain a random unused port.
-  Try<network::Socket> socket = network::Socket::create();
+  Try<inet::Socket> socket = inet::Socket::create();
 
   if (socket.isError()) {
     return Error(socket.error());
   }
 
-  Try<network::Address> result = socket->bind(network::Address());
+  Try<inet::Address> address = socket->bind(inet::Address::ANY_ANY());
 
-  if (result.isSome()) {
-    return result->port;
-  } else {
-    return Error(result.error());
+  if (address.isError()) {
+    return Error(address.error());
   }
 
+  return address->port;
+
   // No explicit cleanup of `socket` as we rely on the implementation
   // of `Socket` to close the socket on destruction.
 }


[06/14] mesos git commit: Refactored network::Address into inet::Address.

Posted by be...@apache.org.
Refactored network::Address into inet::Address.

Also added unix::Address.

Review: https://reviews.apache.org/r/53460


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/907ef806
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/907ef806
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/907ef806

Branch: refs/heads/master
Commit: 907ef80615a59183d3e78aba784df14595a482a8
Parents: 6a77817
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Nov 3 23:36:41 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   1 +
 3rdparty/libprocess/include/process/address.hpp | 292 ++++++++++++++++---
 3rdparty/libprocess/include/process/network.hpp |  28 +-
 3rdparty/libprocess/include/process/pid.hpp     |   2 +-
 3rdparty/libprocess/include/process/socket.hpp  |  99 ++++++-
 .../libprocess/include/process/ssl/gtest.hpp    |   4 +-
 3rdparty/libprocess/src/http.cpp                |   2 +-
 3rdparty/libprocess/src/libevent_ssl_socket.cpp |  31 +-
 3rdparty/libprocess/src/pid.cpp                 |   2 +-
 3rdparty/libprocess/src/poll_socket.cpp         |  34 ++-
 3rdparty/libprocess/src/process.cpp             |   6 +-
 3rdparty/libprocess/src/socket.cpp              |  17 +-
 3rdparty/libprocess/src/tests/process_tests.cpp |   2 +-
 3rdparty/libprocess/src/tests/socket_tests.cpp  |  69 +++++
 3rdparty/libprocess/src/tests/ssl_tests.cpp     |  96 ++++--
 3rdparty/libprocess/src/tests/test_linkee.cpp   |   2 +-
 16 files changed, 569 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 7131989..9d496b8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -232,6 +232,7 @@ libprocess_tests_SOURCES =					\
   src/tests/profiler_tests.cpp					\
   src/tests/queue_tests.cpp					\
   src/tests/reap_tests.cpp					\
+  src/tests/socket_tests.cpp					\
   src/tests/sequence_tests.cpp					\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 5fd8ac4..01bc065 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -24,11 +24,16 @@
 
 #include <glog/logging.h>
 
+#ifndef __WINDOWS__
+#include <sys/un.h>
+#endif // __WINDOWS__
+
 #include <ostream>
 
 #include <boost/functional/hash.hpp>
 
 #include <stout/abort.hpp>
+#include <stout/check.hpp>
 #include <stout/ip.hpp>
 #include <stout/net.hpp>
 #include <stout/stringify.hpp>
@@ -36,43 +41,126 @@
 namespace process {
 namespace network {
 
-// Represents a network "address", subsuming the struct addrinfo and
-// struct sockaddr* that typically is used to encapsulate IP and port.
+namespace inet {
+class Address;
+} // namespace inet {
+
+#ifndef __WINDOWS__
+namespace unix {
+class Address;
+} // namespace unix {
+#endif // __WINDOWS__
+
+// Represents a network "address", subsuming the `struct addrinfo` and
+// `struct sockaddr` that typically is used to encapsulate an address.
 //
-// TODO(benh): Create a Family enumeration to replace sa_family_t.
 // TODO(jieyu): Move this class to stout.
 class Address
 {
 public:
-  Address() : ip(INADDR_ANY), port(0) {}
+  enum class Family {
+    INET,
+#ifndef __WINDOWS__
+    UNIX
+#endif // __WINDOWS__
+  };
+
+  static Try<Address> create(const sockaddr_storage& storage)
+  {
+    switch (storage.ss_family) {
+      case AF_INET:
+#ifndef __WINDOWS__
+      case AF_UNIX:
+#endif // __WINDOWS__
+        return Address(storage);
+      default:
+        return Error("Unsupported family: " + stringify(storage.ss_family));
+    }
+  }
 
-  Address(const net::IP& _ip, uint16_t _port) : ip(_ip), port(_port) {}
+  Family family() const
+  {
+    switch (sockaddr.storage.ss_family) {
+      case AF_INET:
+        return Family::INET;
+#ifndef __WINDOWS__
+      case AF_UNIX:
+        return Family::UNIX;
+#endif // __WINDOWS__
+      default:
+        ABORT("Unexpected family: " + stringify(sockaddr.storage.ss_family));
+    }
+  }
 
-  static Address LOCALHOST_ANY()
+  // Returns the storage size depending on the family of this address.
+  size_t size() const
   {
-    return Address(net::IP(INADDR_ANY), 0);
+    switch (family()) {
+      case Family::INET:
+        return sizeof(sockaddr_in);
+#ifndef __WINDOWS__
+      case Family::UNIX:
+        return sizeof(sockaddr_un);
+#endif // __WINDOWS__
+    }
   }
 
-  static Try<Address> create(const struct sockaddr_storage& storage)
+  operator sockaddr_storage() const
   {
-    switch (storage.ss_family) {
-       case AF_INET: {
-         struct sockaddr_in addr = *(struct sockaddr_in*) &storage;
-         return Address(net::IP(addr.sin_addr), ntohs(addr.sin_port));
-       }
-       default: {
-         return Error(
-             "Unsupported family type: " +
-             stringify(storage.ss_family));
-       }
-     }
+    return sockaddr.storage;
   }
 
-  int family() const
+private:
+  friend class inet::Address;
+  #ifndef __WINDOWS__
+  friend class unix::Address;
+#endif // __WINDOWS__
+  template <typename AddressType>
+  friend Try<AddressType> convert(Try<Address>&& address);
+  friend std::ostream& operator<<(std::ostream& stream, const Address& address);
+
+  Address(const sockaddr_storage& storage)
   {
-    return ip.family();
+    sockaddr.storage = storage;
   }
 
+  union {
+    sockaddr_storage storage;
+    sockaddr_in in;
+#ifndef __WINDOWS__
+    sockaddr_un un;
+#endif // __WINDOWS__
+  } sockaddr;
+};
+
+
+// Helper for converting between Address and other types.
+template <typename AddressType>
+Try<AddressType> convert(Try<Address>&& address);
+
+
+template <>
+inline Try<Address> convert(Try<Address>&& address)
+{
+  return address;
+}
+
+
+namespace inet {
+
+class Address
+{
+public:
+  Address(const net::IP& _ip, uint16_t _port)
+    : ip(_ip), port(_port) {}
+
+  Address(const sockaddr_in& in)
+    : Address(net::IP(in.sin_addr), ntohs(in.sin_port)) {}
+
+  static Address LOOPBACK_ANY() { return Address(net::IP(INADDR_LOOPBACK), 0); }
+
+  static Address ANY_ANY() { return Address(net::IP(INADDR_ANY), 0); }
+
   /**
    * Returns the hostname of this address's IP.
    *
@@ -93,18 +181,6 @@ public:
     return hostname.get();
   }
 
-  // Returns the storage size (i.e., either sizeof(sockaddr_in) or
-  // sizeof(sockaddr_in6) depending on the family) of this address.
-  size_t size() const
-  {
-    switch (family()) {
-      case AF_INET:
-        return sizeof(sockaddr_in);
-      default:
-        ABORT("Unsupported family type: " + stringify(family()));
-    }
-  }
-
   bool operator<(const Address& that) const
   {
     if (ip == that.ip) {
@@ -133,6 +209,22 @@ public:
     return !(*this == that);
   }
 
+  bool operator==(const network::Address& that) const;
+
+  operator network::Address() const
+  {
+    union {
+      sockaddr_storage storage;
+      sockaddr_in in;
+    } sockaddr;
+    memset(&sockaddr.storage, 0, sizeof(sockaddr_storage));
+    sockaddr.in.sin_family = AF_INET;
+    sockaddr.in.sin_addr = ip.in().get();
+    sockaddr.in.sin_port = htons(port);
+    return network::Address(sockaddr.storage);
+  }
+
+  // TODO(benh): Use a sockaddr_in here like we do for unix::Address.
   net::IP ip;
   uint16_t port;
 };
@@ -144,21 +236,149 @@ inline std::ostream& operator<<(std::ostream& stream, const Address& address)
   return stream;
 }
 
+} // namespace inet {
+
+
+template <>
+inline Try<inet::Address> convert(Try<Address>&& address)
+{
+  if (address.isError()) {
+    return Error(address.error());
+  }
+
+  if (address->family() == Address::Family::INET) {
+    return inet::Address(address->sockaddr.in);
+  }
+
+  return Error("Unexpected address family");
+}
+
+
 namespace inet {
-using Address = network::Address;
+
+inline bool Address::operator==(const network::Address& that) const
+{
+  Try<Address> address = convert<Address>(that);
+  if (address.isError()) {
+    return false;
+  }
+  return *this == address.get();
+}
+
 } // namespace inet {
 
+
+#ifndef __WINDOWS__
+namespace unix {
+
+class Address
+{
+public:
+  static Try<Address> create(const std::string& path)
+  {
+    sockaddr_un un;
+
+    const size_t PATH_LENGTH = sizeof(un.sun_path);
+
+    if (path.length() >= PATH_LENGTH) {
+      return Error("Path too long, must be less than " +
+                   stringify(PATH_LENGTH) + " bytes");
+    }
+
+    un.sun_family = AF_UNIX;
+    memcpy(un.sun_path, path.c_str(), path.length() + 1);
+
+    return Address(un);
+  }
+
+  Address(const sockaddr_un& un)
+    : sockaddr() // Zero initialize.
+  {
+    sockaddr.un = un;
+  }
+
+  std::string path() const
+  {
+    if (sockaddr.un.sun_path[0] == '\0') {
+      return '\0' + std::string(sockaddr.un.sun_path + 1);
+    }
+
+    return std::string(sockaddr.un.sun_path);
+  }
+
+  operator network::Address() const
+  {
+    return network::Address(sockaddr.storage);
+  }
+
+private:
+  friend std::ostream& operator<<(
+      std::ostream& stream,
+      const Address& address);
+
+  union {
+    sockaddr_storage storage;
+    sockaddr_un un;
+  } sockaddr;
+};
+
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const Address& address)
+{
+  std::string path = address.path();
+  if (!path.empty() && path[0] == '\0') {
+    path[0] = '@';
+  }
+  return stream << path;
+}
+
+} // namespace unix {
+
+
+template <>
+inline Try<unix::Address> convert(Try<Address>&& address)
+{
+  if (address.isError()) {
+    return Error(address.error());
+  }
+
+  if (address->family() == Address::Family::UNIX) {
+    return unix::Address(address->sockaddr.un);
+  }
+
+  return Error("Unexpected address family");
+}
+#endif // __WINDOWS__
+
+
+inline std::ostream& operator<<(std::ostream& stream, const Address& address)
+{
+  switch (address.family()) {
+    case Address::Family::INET: {
+      stream << inet::Address(address.sockaddr.in);
+    }
+#ifndef __WINDOWS__
+    case Address::Family::UNIX: {
+      stream << unix::Address(address.sockaddr.un);
+    }
+#endif // __WINDOWS__
+  }
+  return stream;
+}
+
 } // namespace network {
 } // namespace process {
 
 namespace std {
 
 template <>
-struct hash<process::network::Address>
+struct hash<process::network::inet::Address>
 {
   typedef size_t result_type;
 
-  typedef process::network::Address argument_type;
+  typedef process::network::inet::Address argument_type;
 
   result_type operator()(const argument_type& address) const
   {

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/network.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/network.hpp b/3rdparty/libprocess/include/process/network.hpp
index 5211066..8234765 100644
--- a/3rdparty/libprocess/include/process/network.hpp
+++ b/3rdparty/libprocess/include/process/network.hpp
@@ -30,10 +30,10 @@ using net::socket;
 // TODO(benh): Remove and defer to Socket::accept.
 inline Try<int> accept(int s)
 {
-  struct sockaddr_storage storage;
-  socklen_t storagelen = sizeof(storage);
+  sockaddr_storage storage;
+  socklen_t length = sizeof(storage);
 
-  int accepted = ::accept(s, (struct sockaddr*) &storage, &storagelen);
+  int accepted = ::accept(s, (sockaddr*) &storage, &length);
   if (accepted < 0) {
     return ErrnoError("Failed to accept");
   }
@@ -45,10 +45,9 @@ inline Try<int> accept(int s)
 // TODO(benh): Remove and defer to Socket::bind.
 inline Try<Nothing> bind(int s, const Address& address)
 {
-  struct sockaddr_storage storage =
-    net::createSockaddrStorage(address.ip, address.port);
+  sockaddr_storage storage = address;
 
-  if (net::bind(s, (struct sockaddr*) &storage, address.size()) < 0) {
+  if (net::bind(s, (sockaddr*) &storage, address.size()) < 0) {
     return ErrnoError("Failed to bind on " + stringify(address));
   }
 
@@ -59,10 +58,9 @@ inline Try<Nothing> bind(int s, const Address& address)
 // TODO(benh): Remove and defer to Socket::connect.
 inline Try<Nothing, SocketError> connect(int s, const Address& address)
 {
-  struct sockaddr_storage storage =
-    net::createSockaddrStorage(address.ip, address.port);
+  sockaddr_storage storage = address;
 
-  if (net::connect(s, (struct sockaddr*) &storage, address.size()) < 0) {
+  if (net::connect(s, (sockaddr*) &storage, address.size()) < 0) {
     return SocketError("Failed to connect to " + stringify(address));
   }
 
@@ -78,10 +76,10 @@ inline Try<Nothing, SocketError> connect(int s, const Address& address)
  */
 inline Try<Address> address(int s)
 {
-  struct sockaddr_storage storage;
-  socklen_t storagelen = sizeof(storage);
+  sockaddr_storage storage;
+  socklen_t length = sizeof(storage);
 
-  if (::getsockname(s, (struct sockaddr*) &storage, &storagelen) < 0) {
+  if (::getsockname(s, (sockaddr*) &storage, &length) < 0) {
     return ErrnoError("Failed to getsockname");
   }
 
@@ -97,10 +95,10 @@ inline Try<Address> address(int s)
  */
 inline Try<Address> peer(int s)
 {
-  struct sockaddr_storage storage;
-  socklen_t storagelen = sizeof(storage);
+  sockaddr_storage storage;
+  socklen_t length = sizeof(storage);
 
-  if (::getpeername(s, (struct sockaddr*) &storage, &storagelen) < 0) {
+  if (::getpeername(s, (sockaddr*) &storage, &length) < 0) {
     return ErrnoError("Failed to getpeername");
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index b274c55..c634916 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -93,7 +93,7 @@ struct UPID
   }
 
   std::string id;
-  network::inet::Address address;
+  network::inet::Address address = network::inet::Address::ANY_ANY();
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 087a072..7f489e9 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -95,6 +95,7 @@ public:
    // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
    // configurable by the caller of the interface.
   static Try<std::shared_ptr<SocketImpl>> create(
+      Address::Family family,
       Kind kind = DEFAULT_KIND());
 
   virtual ~SocketImpl()
@@ -238,6 +239,10 @@ template <typename AddressType>
 class Socket
 {
 public:
+  static_assert(
+      std::is_convertible<AddressType, network::Address>::value,
+      "Requires type convertible to `network::Address`");
+
   /**
    * Returns an instance of a `Socket` using the specified kind of
    * implementation.
@@ -273,6 +278,24 @@ public:
   static Try<Socket> create(SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND());
 
   /**
+   * Returns an instance of a `Socket` using the specified
+   * `Address::Family` to determine the address family to use. An
+   * optional implementation kind can be specified. The NONBLOCK and
+   * CLOEXEC options will be set on the underlying file descriptor for
+   * the socket.
+   *
+   * NOTE: this is only defined for `AddressType` of
+   * `network::Address`, all others are explicitly deleted.
+   *
+   * @param kind Optional. The desired `Socket` implementation.
+   *
+   * @return An instance of a `Socket`.
+   */
+  static Try<Socket> create(
+      Address::Family family,
+      SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND());
+
+  /**
    * Returns the kind representing the underlying implementation
    * of the `Socket` instance.
    *
@@ -295,12 +318,12 @@ public:
 
   Try<AddressType> address() const
   {
-    return impl->address();
+    return convert<AddressType>(impl->address());
   }
 
   Try<AddressType> peer() const
   {
-    return impl->peer();
+    return convert<AddressType>(impl->peer());
   }
 
   int get() const
@@ -310,7 +333,7 @@ public:
 
   Try<AddressType> bind(const AddressType& address)
   {
-    return impl->bind(address);
+    return convert<AddressType>(impl->bind(address));
   }
 
   Try<Nothing> listen(int backlog)
@@ -388,10 +411,76 @@ private:
 using Socket = network::internal::Socket<network::Address>;
 
 namespace inet {
-
 using Socket = network::internal::Socket<inet::Address>;
-
 } // namespace inet {
+
+#ifndef __WINDOWS__
+namespace unix {
+using Socket = network::internal::Socket<unix::Address>;
+} // namespace unix {
+#endif // __WINDOWS__
+
+
+namespace internal {
+
+template <>
+Try<Socket<network::Address>> Socket<network::Address>::create(
+    SocketImpl::Kind kind) = delete;
+
+
+template <>
+inline Try<Socket<network::Address>> Socket<network::Address>::create(
+    Address::Family family,
+    SocketImpl::Kind kind)
+{
+  Try<std::shared_ptr<SocketImpl>> impl = SocketImpl::create(family, kind);
+  if (impl.isError()) {
+    return Error(impl.error());
+  }
+  return Socket(impl.get());
+}
+
+
+template <>
+inline Try<Socket<inet::Address>> Socket<inet::Address>::create(
+    SocketImpl::Kind kind)
+{
+  Try<std::shared_ptr<SocketImpl>> impl =
+    SocketImpl::create(Address::Family::INET, kind);
+  if (impl.isError()) {
+    return Error(impl.error());
+  }
+  return Socket(impl.get());
+}
+
+
+template <>
+Try<Socket<inet::Address>> Socket<inet::Address>::create(
+    Address::Family family,
+    SocketImpl::Kind kind) = delete;
+
+
+#ifndef __WINDOWS__
+template <>
+inline Try<Socket<unix::Address>> Socket<unix::Address>::create(
+    SocketImpl::Kind kind)
+{
+  Try<std::shared_ptr<SocketImpl>> impl =
+    SocketImpl::create(Address::Family::UNIX, kind);
+  if (impl.isError()) {
+    return Error(impl.error());
+  }
+  return Socket(impl.get());
+}
+
+
+template <>
+Try<Socket<unix::Address>> Socket<unix::Address>::create(
+    Address::Family family,
+    SocketImpl::Kind kind) = delete;
+#endif // __WINDOWS__
+
+} // namespace internal {
 } // namespace network {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/include/process/ssl/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/ssl/gtest.hpp b/3rdparty/libprocess/include/process/ssl/gtest.hpp
index 8649da1..2a19b3b 100644
--- a/3rdparty/libprocess/include/process/ssl/gtest.hpp
+++ b/3rdparty/libprocess/include/process/ssl/gtest.hpp
@@ -302,10 +302,10 @@ protected:
 
     process::network::inet::Socket server = create.get();
 
-    // We need to explicitly bind to INADDR_LOOPBACK so the
+    // We need to explicitly bind to the loopback address so the
     // certificate we create in this test fixture can be verified.
     Try<process::network::inet::Address> bind =
-      server.bind(process::network::inet::Address(net::IP(INADDR_LOOPBACK), 0));
+      server.bind(process::network::inet::Address::LOOPBACK_ANY());
 
     if (bind.isError()) {
       return Error(bind.error());

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 93a8a80..1f2de9d 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1315,7 +1315,7 @@ Future<Nothing> Connection::disconnected()
 Future<Connection> connect(const URL& url)
 {
   // TODO(bmahler): Move address resolution into the URL class?
-  Address address;
+  Address address = Address::ANY_ANY();
 
   if (url.ip.isNone() && url.domain.isNone()) {
     return Failure("Expected URL.ip or URL.domain to be set");

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index f9551bf..dddd0e2 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -494,21 +494,25 @@ Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
     return Failure("Failed to connect: bufferevent_openssl_socket_new");
   }
 
-  // Try and determine the 'peer_hostname' from the address we're
-  // connecting to in order to properly verify the certificate later.
-  const Try<string> hostname = address.hostname();
+  if (address.family() == Address::Family::INET) {
+    // Try and determine the 'peer_hostname' from the address we're
+    // connecting to in order to properly verify the certificate
+    // later.
+    const Try<string> hostname =
+      network::convert<inet::Address>(address)->hostname();
+
+    if (hostname.isError()) {
+      VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
+    } else {
+      VLOG(2) << "Connecting to " << hostname.get();
+      peer_hostname = hostname.get();
+    }
 
-  if (hostname.isError()) {
-    VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
-  } else {
-    VLOG(2) << "Connecting to " << hostname.get();
-    peer_hostname = hostname.get();
+    // Determine the 'peer_ip' from the address we're connecting to in
+    // order to properly verify the certificate later.
+    peer_ip = network::convert<inet::Address>(address)->ip;
   }
 
-  // Determine the 'peer_ip' from the address we're connecting to in
-  // order to properly verify the certificate later.
-  peer_ip = address.ip;
-
   // Optimistically construct a 'ConnectRequest' and future.
   Owned<ConnectRequest> request(new ConnectRequest());
   Future<Nothing> future = request->promise.future();
@@ -533,8 +537,7 @@ Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
 
   run_in_event_loop(
       [self, address]() {
-        sockaddr_storage addr =
-          net::createSockaddrStorage(address.ip, address.port);
+        sockaddr_storage addr = address;
 
           // Assign the callbacks for the bufferevent. We do this
           // before the 'bufferevent_socket_connect()' call to avoid

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index ffb1021..023f881 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -101,7 +101,7 @@ istream& operator>>(istream& stream, UPID& pid)
 
   string id;
   string host;
-  network::inet::Address address;
+  network::inet::Address address = network::inet::Address::ANY_ANY();
 
   size_t index = str.find('@');
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 9184ea3..93ca37f 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -75,21 +75,31 @@ Future<int> accept(int fd)
     return Failure("Failed to accept, cloexec: " + cloexec.error());
   }
 
+  Try<Address> address = network::address(s);
+  if (address.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to get address: "
+                                << address.error();
+    os::close(s);
+    return Failure("Failed to get address: " + address.error());
+  }
+
   // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
   // NOTE: We cast to `char*` here because the function prototypes on Windows
   // use `char*` instead of `void*`.
-  int on = 1;
-  if (::setsockopt(
-          s,
-          SOL_TCP,
-          TCP_NODELAY,
-          reinterpret_cast<const char*>(&on),
-          sizeof(on)) < 0) {
-    const string error = os::strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-    return Failure(
-      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  if (address->family() == Address::Family::INET) {
+    int on = 1;
+    if (::setsockopt(
+            s,
+            SOL_TCP,
+            TCP_NODELAY,
+            reinterpret_cast<const char*>(&on),
+            sizeof(on)) < 0) {
+      const string error = os::strerror(errno);
+      VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+      os::close(s);
+      return Failure(
+          "Failed to turn off the Nagle algorithm: " + stringify(error));
+    }
   }
 
   return s;

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index a07d5e3..8204327 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -580,7 +580,7 @@ static std::mutex* socket_mutex = new std::mutex();
 static Future<Socket> future_accept;
 
 // Local socket address.
-static Address __address__;
+static Address __address__ = Address::ANY_ANY();
 
 // Active SocketManager (eventually will probably be thread-local).
 static SocketManager* socket_manager = nullptr;
@@ -1072,7 +1072,7 @@ bool initialize(
   Clock::initialize(lambda::bind(&timedout, lambda::_1));
 
   // Fill in the local IP and port for inter-libprocess communication.
-  __address__ = Address::LOCALHOST_ANY();
+  __address__ = Address::ANY_ANY();
 
   // Fetch and parse the libprocess environment variables.
   internal::Flags flags;
@@ -1310,7 +1310,7 @@ void finalize()
   // Clear the public address of the server socket.
   // NOTE: This variable is necessary for process communication, so it
   // cannot be cleared until after the `ProcessManager` is deleted.
-  __address__ = Address::LOCALHOST_ANY();
+  __address__ = Address::ANY_ANY();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 0b7631d..2eaa5fc 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -47,18 +47,29 @@ Try<std::shared_ptr<SocketImpl>> SocketImpl::create(int s, Kind kind)
 }
 
 
-Try<std::shared_ptr<SocketImpl>> SocketImpl::create(Kind kind)
+Try<std::shared_ptr<SocketImpl>> SocketImpl::create(
+    Address::Family family,
+    Kind kind)
 {
+  int domain = [=]() {
+    switch (family) {
+      case Address::Family::INET: return AF_INET;
+#ifndef __WINDOWS__
+      case Address::Family::UNIX: return AF_UNIX;
+#endif // __WINDOWS__
+    }
+  }();
+
   // Supported in Linux >= 2.6.27.
 #if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
   Try<int> s =
-    network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+    network::socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
 
   if (s.isError()) {
     return Error("Failed to create socket: " + s.error());
   }
 #else
-  Try<int> s = network::socket(AF_INET, SOCK_STREAM, 0);
+  Try<int> s = network::socket(domain, SOCK_STREAM, 0);
   if (s.isError()) {
     return Error("Failed to create socket: " + s.error());
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 0424a10..cd159b6 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1412,7 +1412,7 @@ TEST_TEMP_DISABLED_ON_WINDOWS(ProcessTest, Http2)
 
   Socket socket = create.get();
 
-  ASSERT_SOME(socket.bind(Address()));
+  ASSERT_SOME(socket.bind(Address::ANY_ANY()));
 
   // Create a UPID for 'Libprocess-From' based on the IP and port we
   // got assigned.

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/socket_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/socket_tests.cpp b/3rdparty/libprocess/src/tests/socket_tests.cpp
new file mode 100644
index 0000000..44c3c9a
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/socket_tests.cpp
@@ -0,0 +1,69 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <process/gtest.hpp>
+#include <process/future.hpp>
+#include <process/socket.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/try.hpp>
+
+#include <stout/tests/utils.hpp>
+
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
+
+using process::Future;
+
+using std::string;
+
+class SocketTest : public TemporaryDirectoryTest {};
+
+#ifndef __WINDOWS__
+TEST_F(SocketTest, Unix)
+{
+  Try<unix::Socket> server = unix::Socket::create();
+  ASSERT_SOME(server);
+
+  Try<unix::Socket> client = unix::Socket::create();
+  ASSERT_SOME(client);
+
+  // Use a path in the temporary directory so it gets cleaned up.
+  string path = path::join(sandbox.get(), "socket");
+
+  Try<unix::Address> address = unix::Address::create(path);
+  ASSERT_SOME(address);
+
+  ASSERT_SOME(server->bind(address.get()));
+  ASSERT_SOME(server->listen(1));
+
+  Future<unix::Socket> accept = server->accept();
+
+  AWAIT_READY(client->connect(address.get()));
+  AWAIT_READY(accept);
+
+  unix::Socket socket = accept.get();
+
+  const string data = "Hello World!";
+
+  AWAIT_READY(client->send(data));
+  AWAIT_EQ(data, socket.recv(data.size()));
+
+  AWAIT_READY(socket.send(data));
+  AWAIT_EQ(data, client->recv(data.size()));
+}
+#endif // __WINDOWS__

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/ssl_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_tests.cpp b/3rdparty/libprocess/src/tests/ssl_tests.cpp
index bdb9420..7fa46e4 100644
--- a/3rdparty/libprocess/src/tests/ssl_tests.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_tests.cpp
@@ -48,6 +48,9 @@ namespace http = process::http;
 namespace io = process::io;
 namespace network = process::network;
 namespace openssl = network::openssl;
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
 
 using network::inet::Address;
 using network::inet::Socket;
@@ -132,50 +135,97 @@ TEST_P(SSLTest, BasicSameProcess)
 
   openssl::reinitialize();
 
-  const Try<Socket> server_create = Socket::create(SocketImpl::Kind::SSL);
-  ASSERT_SOME(server_create);
+  Try<Socket> server = Socket::create(SocketImpl::Kind::SSL);
+  ASSERT_SOME(server);
 
-  const Try<Socket> client_create = Socket::create(SocketImpl::Kind::SSL);
-  ASSERT_SOME(client_create);
+  Try<Socket> client = Socket::create(SocketImpl::Kind::SSL);
+  ASSERT_SOME(client);
 
-  Socket server = server_create.get();
-  Socket client = client_create.get();
+  // We need to explicitly bind to the loopback address so the
+  // certificate we create in this test fixture can be verified.
+  ASSERT_SOME(server->bind(Address::LOOPBACK_ANY()));
+  ASSERT_SOME(server->listen(BACKLOG));
 
-  // We need to explicitly bind to INADDR_LOOPBACK so the certificate
-  // we create in this test fixture can be verified.
-  ASSERT_SOME(server.bind(Address(net::IP(INADDR_LOOPBACK), 0)));
+  Try<Address> address = server->address();
+  ASSERT_SOME(address);
 
-  const Try<Nothing> listen = server.listen(BACKLOG);
-  ASSERT_SOME(listen);
+  Future<Socket> accept = server->accept();
 
-  const Try<Address> server_address = server.address();
-  ASSERT_SOME(server_address);
+  AWAIT_ASSERT_READY(client->connect(address.get()));
 
-  const Future<Socket> _socket = server.accept();
+  // Wait for the server to have accepted the client connection.
+  AWAIT_ASSERT_READY(accept);
 
-  const Future<Nothing> connect = client.connect(server_address.get());
+  Socket socket = accept.get();
+
+  // Send a message from the client to the server.
+  const string data = "Hello World!";
+  AWAIT_ASSERT_READY(client->send(data));
+
+  // Verify the server received the message.
+  AWAIT_ASSERT_EQ(data, socket.recv(data.size()));
+
+  // Send the message back from the server to the client.
+  AWAIT_ASSERT_READY(socket.send(data));
+
+  // Verify the client received the message.
+  AWAIT_ASSERT_EQ(data, client->recv(data.size()));
+}
+
+
+#ifndef __WINDOWS__
+TEST_P(SSLTest, BasicSameProcessUnix)
+{
+  os::setenv("LIBPROCESS_SSL_ENABLED", "true");
+  os::setenv("LIBPROCESS_SSL_KEY_FILE", key_path().string());
+  os::setenv("LIBPROCESS_SSL_CERT_FILE", certificate_path().string());
+  // NOTE: we must set LIBPROCESS_SSL_REQUIRE_CERT to false because we
+  // don't have a hostname or IP to verify!
+  os::setenv("LIBPROCESS_SSL_REQUIRE_CERT", "false");
+  os::setenv("LIBPROCESS_SSL_CA_DIR", os::getcwd());
+  os::setenv("LIBPROCESS_SSL_CA_FILE", certificate_path().string());
+  os::setenv("LIBPROCESS_SSL_VERIFY_IPADD", GetParam());
+
+  openssl::reinitialize();
+
+  Try<unix::Socket> server = unix::Socket::create(SocketImpl::Kind::SSL);
+  ASSERT_SOME(server);
+
+  Try<unix::Socket> client = unix::Socket::create(SocketImpl::Kind::SSL);
+  ASSERT_SOME(client);
+
+  // Use a path in the temporary directory so it gets cleaned up.
+  string path = path::join(sandbox.get(), "socket");
+
+  Try<unix::Address> address = unix::Address::create(path);
+  ASSERT_SOME(address);
+
+  ASSERT_SOME(server->bind(address.get()));
+  ASSERT_SOME(server->listen(BACKLOG));
+
+  Future<unix::Socket> accept = server->accept();
+
+  AWAIT_ASSERT_READY(client->connect(address.get()));
 
   // Wait for the server to have accepted the client connection.
-  AWAIT_ASSERT_READY(_socket);
-  Socket socket = _socket.get(); // TODO(jmlvanre): Remove const copy.
+  AWAIT_ASSERT_READY(accept);
 
-  // Verify that the client also views the connection as established.
-  AWAIT_ASSERT_READY(connect);
+  unix::Socket socket = accept.get();
 
   // Send a message from the client to the server.
   const string data = "Hello World!";
-  AWAIT_ASSERT_READY(client.send(data));
+  AWAIT_ASSERT_READY(client->send(data));
 
   // Verify the server received the message.
-  AWAIT_ASSERT_EQ(data, socket.recv());
+  AWAIT_ASSERT_EQ(data, socket.recv(data.size()));
 
   // Send the message back from the server to the client.
   AWAIT_ASSERT_READY(socket.send(data));
 
   // Verify the client received the message.
-  AWAIT_ASSERT_EQ(data, client.recv());
+  AWAIT_ASSERT_EQ(data, client->recv(data.size()));
 }
-
+#endif // __WINDOWS__
 
 // Test a basic back-and-forth communication using the 'ssl-client'
 // subprocess.

http://git-wip-us.apache.org/repos/asf/mesos/blob/907ef806/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 99ea1eb..921d676 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -120,7 +120,7 @@ int main(int argc, char** argv)
   }
 
   // Bind to some random port.
-  Try<Address> bind = __s__->bind(Address::LOCALHOST_ANY());
+  Try<Address> bind = __s__->bind(Address::ANY_ANY());
   if (bind.isError()) {
     EXIT(EXIT_FAILURE) << "Failed to bind: " << bind.error();
   }


[09/14] mesos git commit: Inlined function only used one place.

Posted by be...@apache.org.
Inlined function only used one place.

Review: https://reviews.apache.org/r/53461


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2529d077
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2529d077
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2529d077

Branch: refs/heads/master
Commit: 2529d077a6a1603f7689a684a1e946f16e474006
Parents: 4694267
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Nov 3 23:37:40 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/stout/include/stout/net.hpp | 48 ++++++++++++++-----------------
 1 file changed, 21 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2529d077/3rdparty/stout/include/stout/net.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/net.hpp b/3rdparty/stout/include/stout/net.hpp
index 083b8d1..4803aeb 100644
--- a/3rdparty/stout/include/stout/net.hpp
+++ b/3rdparty/stout/include/stout/net.hpp
@@ -198,32 +198,6 @@ inline struct addrinfo createAddrInfo(int socktype, int family, int flags)
 }
 
 
-// TODO(evelinad): Move this to Address.
-inline struct sockaddr_storage createSockaddrStorage(const IP& ip, int port)
-{
-  struct sockaddr_storage storage;
-  memset(&storage, 0, sizeof(storage));
-
-  switch (ip.family()) {
-    case AF_INET: {
-      struct sockaddr_in addr;
-      memset(&addr, 0, sizeof(addr));
-      addr.sin_family = AF_INET;
-      addr.sin_addr = ip.in().get();
-      addr.sin_port = htons(port);
-
-      memcpy(&storage, &addr, sizeof(addr));
-      break;
-    }
-    default: {
-      ABORT("Unsupported family type: " + stringify(ip.family()));
-    }
-  }
-
-  return storage;
-}
-
-
 inline Try<std::string> hostname()
 {
   char host[512];
@@ -252,9 +226,29 @@ inline Try<std::string> hostname()
 // Returns a Try of the hostname for the provided IP. If the hostname
 // cannot be resolved, then a string version of the IP address is
 // returned.
+//
+// TODO(benh): Merge with `net::hostname`.
 inline Try<std::string> getHostname(const IP& ip)
 {
-  struct sockaddr_storage storage = createSockaddrStorage(ip, 0);
+  struct sockaddr_storage storage;
+  memset(&storage, 0, sizeof(storage));
+
+  switch (ip.family()) {
+    case AF_INET: {
+      struct sockaddr_in addr;
+      memset(&addr, 0, sizeof(addr));
+      addr.sin_family = AF_INET;
+      addr.sin_addr = ip.in().get();
+      addr.sin_port = 0;
+
+      memcpy(&storage, &addr, sizeof(addr));
+      break;
+    }
+    default: {
+      ABORT("Unsupported family type: " + stringify(ip.family()));
+    }
+  }
+
   char hostname[MAXHOSTNAMELEN];
 
   int error = getnameinfo(


[11/14] mesos git commit: Added support for http::connect to take an network::Address.

Posted by be...@apache.org.
Added support for http::connect to take an network::Address.

Review: https://reviews.apache.org/r/54112


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a4b5e62
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a4b5e62
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a4b5e62

Branch: refs/heads/master
Commit: 9a4b5e62486e9b1b0ee8913f8b1ae83b165ad333
Parents: 448b860
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 13:58:08 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:15:20 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 10 +++++-
 3rdparty/libprocess/src/http.cpp             | 37 +++++++++++++++++------
 2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9a4b5e62/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 08a8390..7894b31 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -849,7 +849,8 @@ public:
   bool operator!=(const Connection& c) const { return !(*this == c); }
 
 private:
-  Connection(const network::inet::Socket& s);
+  Connection(const network::Socket& s);
+  friend Future<Connection> connect(const network::Address& address);
   friend Future<Connection> connect(const URL&);
 
   // Forward declaration.
@@ -859,6 +860,13 @@ private:
 };
 
 
+// TODO(benh): Currently we don't support SSL for this version of
+// connect. We should support this, perhaps with an enum or a bool and
+// then update the `connect(URL)` variant to just call this function
+// instead.
+Future<Connection> connect(const network::Address& address);
+
+
 Future<Connection> connect(const URL& url);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a4b5e62/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 1f2de9d..03512c6 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -1024,7 +1024,7 @@ Future<Response> convert(const Response& pipeResponse)
 class ConnectionProcess : public Process<ConnectionProcess>
 {
 public:
-  ConnectionProcess(const Socket& _socket)
+  ConnectionProcess(const network::Socket& _socket)
     : ProcessBase(ID::generate("__http_connection__")),
       socket(_socket),
       sendChain(Nothing()),
@@ -1061,7 +1061,7 @@ public:
 
     // We must chain the calls to Socket::send as it
     // otherwise interleaves data across calls.
-    Socket socket_ = socket;
+    network::Socket socket_ = socket;
 
     sendChain = sendChain
       .then([socket_, request]() {
@@ -1124,7 +1124,7 @@ protected:
   }
 
 private:
-  static Future<Nothing> _send(Socket socket, Pipe::Reader reader)
+  static Future<Nothing> _send(network::Socket socket, Pipe::Reader reader)
   {
     return reader.read()
       .then([socket, reader](const string& data) mutable -> Future<Nothing> {
@@ -1233,7 +1233,7 @@ private:
     read();
   }
 
-  Socket socket;
+  network::Socket socket;
   StreamingResponseDecoder decoder;
   Future<Nothing> sendChain;
   Promise<Nothing> disconnection;
@@ -1262,7 +1262,7 @@ struct Connection::Data
   // on within a different execution context. More generally,
   // we should be passing Process ownership to libprocess to
   // ensure all interaction with a Process occurs through a PID.
-  Data(const Socket& s)
+  Data(const network::Socket& s)
     : process(spawn(new internal::ConnectionProcess(s), true)) {}
 
   ~Data()
@@ -1279,7 +1279,7 @@ struct Connection::Data
 };
 
 
-Connection::Connection(const Socket& s)
+Connection::Connection(const network::Socket& s)
   : data(std::make_shared<Connection::Data>(s)) {}
 
 
@@ -1312,6 +1312,20 @@ Future<Nothing> Connection::disconnected()
 }
 
 
+Future<Connection> connect(const network::Address& address)
+{
+  Try<network::Socket> socket = network::Socket::create(address.family());
+  if (socket.isError()) {
+    return Failure("Failed to create socket: " + socket.error());
+  }
+
+  return socket->connect(address)
+    .then([socket]() {
+      return Connection(socket.get());
+    });
+}
+
+
 Future<Connection> connect(const URL& url)
 {
   // TODO(bmahler): Move address resolution into the URL class?
@@ -1340,15 +1354,20 @@ Future<Connection> connect(const URL& url)
 
   address.port = url.port.get();
 
-  Try<Socket> socket = [&url]() -> Try<Socket> {
+  // TODO(benh): Reuse `connect(address)` once it supports SSL.
+  Try<network::Socket> socket = [&url]() -> Try<network::Socket> {
     // Default to 'http' if no scheme was specified.
     if (url.scheme.isNone() || url.scheme == string("http")) {
-      return Socket::create(SocketImpl::Kind::POLL);
+      return network::Socket::create(
+          network::Address::Family::INET,
+          SocketImpl::Kind::POLL);
     }
 
     if (url.scheme == string("https")) {
 #ifdef USE_SSL_SOCKET
-      return Socket::create(SocketImpl::Kind::SSL);
+      return network::Socket::create(
+          network::Address::Family::INET,
+          SocketImpl::Kind::SSL);
 #else
       return Error("'https' scheme requires SSL enabled");
 #endif


[10/14] mesos git commit: Removed unused Socket from Encoder.

Posted by be...@apache.org.
Removed unused Socket from Encoder.

Review: https://reviews.apache.org/r/54111


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/448b860b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/448b860b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/448b860b

Branch: refs/heads/master
Commit: 448b860b295267fc249e7a640bd275f474b2ff23
Parents: 0d0805e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 12:04:32 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:10:39 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp             | 26 +++++---------
 3rdparty/libprocess/src/process.cpp             | 37 ++++++++++----------
 3rdparty/libprocess/src/tests/decoder_tests.cpp |  1 -
 3 files changed, 28 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/448b860b/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 9667a62..1447d6f 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -46,7 +46,8 @@ public:
     FILE
   };
 
-  explicit Encoder(const network::inet::Socket& _s) : s(_s) {}
+  Encoder() = default;
+
   virtual ~Encoder() {}
 
   virtual Kind kind() const = 0;
@@ -54,22 +55,14 @@ public:
   virtual void backup(size_t length) = 0;
 
   virtual size_t remaining() const = 0;
-
-  network::inet::Socket socket() const
-  {
-    return s;
-  }
-
-private:
-  const network::inet::Socket s; // The socket this encoder is associated with.
 };
 
 
 class DataEncoder : public Encoder
 {
 public:
-  DataEncoder(const network::inet::Socket& s, const std::string& _data)
-    : Encoder(s), data(_data), index(0) {}
+  DataEncoder(const std::string& _data)
+    : data(_data), index(0) {}
 
   virtual ~DataEncoder() {}
 
@@ -107,8 +100,8 @@ private:
 class MessageEncoder : public DataEncoder
 {
 public:
-  MessageEncoder(const network::inet::Socket& s, Message* _message)
-    : DataEncoder(s, encode(_message)), message(_message) {}
+  MessageEncoder(Message* _message)
+    : DataEncoder(encode(_message)), message(_message) {}
 
   virtual ~MessageEncoder()
   {
@@ -162,10 +155,9 @@ class HttpResponseEncoder : public DataEncoder
 {
 public:
   HttpResponseEncoder(
-      const network::inet::Socket& s,
       const http::Response& response,
       const http::Request& request)
-    : DataEncoder(s, encode(response, request)) {}
+    : DataEncoder(encode(response, request)) {}
 
   static std::string encode(
       const http::Response& response,
@@ -251,8 +243,8 @@ public:
 class FileEncoder : public Encoder
 {
 public:
-  FileEncoder(const network::inet::Socket& s, int _fd, size_t _size)
-    : Encoder(s), fd(_fd), size(_size), index(0) {}
+  FileEncoder(int _fd, size_t _size)
+    : fd(_fd), size(_size), index(0) {}
 
   virtual ~FileEncoder()
   {

http://git-wip-us.apache.org/repos/asf/mesos/blob/448b860b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 200cefc..889a034 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -386,7 +386,7 @@ public:
   // This generally happens when `process::finalize` is called.
   void unproxy(const Socket& socket);
 
-  void send(Encoder* encoder, bool persist);
+  void send(Encoder* encoder, bool persist, const Socket& socket);
   void send(const Response& response,
             const Request& request,
             const Socket& socket);
@@ -1492,13 +1492,15 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
         // TODO(benh): Consider a way to have the socket manager turn
         // on TCP_CORK for both sends and then turn it off.
         socket_manager->send(
-            new HttpResponseEncoder(socket, response, request),
-            true);
+            new HttpResponseEncoder(response, request),
+            true,
+            socket);
 
         // Note the file descriptor gets closed by FileEncoder.
         socket_manager->send(
-            new FileEncoder(socket, fd, s.st_size),
-            request.keepAlive);
+            new FileEncoder(fd, s.st_size),
+            request.keepAlive,
+            socket);
       }
     }
   } else if (response.type == Response::PIPE) {
@@ -1513,8 +1515,9 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request)
     VLOG(3) << "Starting \"chunked\" streaming";
 
     socket_manager->send(
-        new HttpResponseEncoder(socket, response, request),
-        true);
+        new HttpResponseEncoder(response, request),
+        true,
+        socket);
 
     CHECK_SOME(response.reader);
     http::Pipe::Reader reader = response.reader.get();
@@ -1569,8 +1572,9 @@ void HttpProxy::stream(
 
     // Always persist the connection when streaming is not finished.
     socket_manager->send(
-        new DataEncoder(socket, out.str()),
-        finished ? request->keepAlive : true);
+        new DataEncoder(out.str()),
+        finished ? request->keepAlive : true,
+        socket);
   } else if (chunk.isFailed()) {
     VLOG(1) << "Failed to read from stream: " << chunk.failure();
     // TODO(bmahler): Have to close connection if headers were sent!
@@ -2033,12 +2037,11 @@ void _send(
 } // namespace internal {
 
 
-void SocketManager::send(Encoder* encoder, bool persist)
+void SocketManager::send(Encoder* encoder, bool persist, const Socket& socket)
 {
   CHECK(encoder != nullptr);
 
   synchronized (mutex) {
-    Socket socket = encoder->socket();
     if (sockets.count(socket) > 0) {
       // Update whether or not this socket should get disposed after
       // there is no more data to send.
@@ -2061,7 +2064,7 @@ void SocketManager::send(Encoder* encoder, bool persist)
   }
 
   if (encoder != nullptr) {
-    internal::send(encoder, encoder->socket());
+    internal::send(encoder, socket);
   }
 }
 
@@ -2081,7 +2084,7 @@ void SocketManager::send(
     }
   }
 
-  send(new HttpResponseEncoder(socket, response, request), persist);
+  send(new HttpResponseEncoder(response, request), persist, socket);
 }
 
 
@@ -2150,7 +2153,7 @@ void SocketManager::send_connect(
     return;
   }
 
-  Encoder* encoder = new MessageEncoder(socket, message);
+  Encoder* encoder = new MessageEncoder(message);
 
   // Receive and ignore data from this socket. Note that we don't
   // expect to receive anything other than HTTP '202 Accepted'
@@ -2195,7 +2198,7 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
       }
 
       if (outgoing.count(socket.get()) > 0) {
-        outgoing[socket.get()].push(new MessageEncoder(socket.get(), message));
+        outgoing[socket.get()].push(new MessageEncoder(message));
         return;
       } else {
         // Initialize the outgoing queue.
@@ -2244,9 +2247,7 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
   } else {
     // If we're not connecting and we haven't added the encoder to
     // the 'outgoing' queue then schedule it to be sent.
-    internal::send(
-        new MessageEncoder(socket.get(), message),
-        socket.get());
+    internal::send(new MessageEncoder(message), socket.get());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/448b860b/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index 7356716..87563f4 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -17,7 +17,6 @@
 
 #include <process/gtest.hpp>
 #include <process/owned.hpp>
-#include <process/socket.hpp>
 
 #include <stout/gtest.hpp>
 


[13/14] mesos git commit: Added support for specifying how a socket should be shutdown.

Posted by be...@apache.org.
Added support for specifying how a socket should be shutdown.

Review: https://reviews.apache.org/r/54113


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4417a4e8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4417a4e8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4417a4e8

Branch: refs/heads/master
Commit: 4417a4e8917e17e70942a77bd796857978778888
Parents: b3013ff
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 27 12:10:14 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:19:17 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp | 27 ++++++++++++++++-----
 1 file changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4417a4e8/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7f489e9..e9e89ee 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -183,11 +183,9 @@ public:
    * Shutdown the receive-side of the socket. No further data can be
    * received from the socket.
    */
-  // TODO(neilc): Change this to allow the caller to specify `how`.
-  // See MESOS-5658.
-  virtual Try<Nothing> shutdown()
+  virtual Try<Nothing> shutdown(int how)
   {
-    if (::shutdown(s, SHUT_RD) < 0) {
+    if (::shutdown(s, how) < 0) {
       return ErrnoError();
     }
 
@@ -380,9 +378,26 @@ public:
     return impl->send(data);
   }
 
-  Try<Nothing> shutdown()
+  enum class Shutdown
   {
-    return impl->shutdown();
+    READ,
+    WRITE,
+    READ_WRITE
+  };
+
+  // TODO(benh): Replace the default to Shutdown::READ_WRITE or remove
+  // all together since it's unclear what the defauilt should be.
+  Try<Nothing> shutdown(Shutdown shutdown = Shutdown::READ)
+  {
+    int how = [&]() {
+      switch (shutdown) {
+        case Shutdown::READ: return SHUT_RD;
+        case Shutdown::WRITE: return SHUT_WR;
+        case Shutdown::READ_WRITE: return SHUT_RDWR;
+      }
+    }();
+
+    return impl->shutdown(how);
   }
 
   // Support implicit conversion of any `Socket<AddressType>` to a


[04/14] mesos git commit: Refactored `Socket` to support a templated version.

Posted by be...@apache.org.
Refactored `Socket` to support a templated version.

The templated `Socket` will enable us to provide type safe
functionality for different addresses, ultimately `inet::Address` and
`unix::Address` (and likely eventually `inet6::Address` as well).

Also introduced the `inet` namespace and updated the code to
explicitly use `inet::Address` and `inet::Socket`.

Review: https://reviews.apache.org/r/53459


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a77817e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a77817e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a77817e

Branch: refs/heads/master
Commit: 6a77817e5ea62e7a779a350c25f351b8915c6385
Parents: 7aa6849
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 2 23:08:55 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/address.hpp |   4 +
 .../libprocess/include/process/firewall.hpp     |   4 +-
 3rdparty/libprocess/include/process/http.hpp    |  10 +-
 3rdparty/libprocess/include/process/pid.hpp     |   7 +-
 3rdparty/libprocess/include/process/process.hpp |   2 +-
 3rdparty/libprocess/include/process/socket.hpp  | 400 +++++++++++--------
 .../libprocess/include/process/ssl/gtest.hpp    |  17 +-
 3rdparty/libprocess/src/encoder.hpp             |  14 +-
 3rdparty/libprocess/src/http.cpp                |  10 +-
 3rdparty/libprocess/src/libevent_ssl_socket.cpp |  22 +-
 3rdparty/libprocess/src/libevent_ssl_socket.hpp |  16 +-
 3rdparty/libprocess/src/pid.cpp                 |   2 +-
 3rdparty/libprocess/src/poll_socket.cpp         |  10 +-
 3rdparty/libprocess/src/poll_socket.hpp         |  15 +-
 3rdparty/libprocess/src/process.cpp             |  22 +-
 3rdparty/libprocess/src/socket.cpp              | 117 +++---
 3rdparty/libprocess/src/tests/decoder_tests.cpp |   6 +-
 3rdparty/libprocess/src/tests/http_tests.cpp    |   2 +-
 3rdparty/libprocess/src/tests/process_tests.cpp |   4 +-
 3rdparty/libprocess/src/tests/ssl_client.cpp    |  10 +-
 3rdparty/libprocess/src/tests/ssl_tests.cpp     |  16 +-
 3rdparty/libprocess/src/tests/test_linkee.cpp   |   4 +-
 22 files changed, 391 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 04e3155..5fd8ac4 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -144,6 +144,10 @@ inline std::ostream& operator<<(std::ostream& stream, const Address& address)
   return stream;
 }
 
+namespace inet {
+using Address = network::Address;
+} // namespace inet {
+
 } // namespace network {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/firewall.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/firewall.hpp b/3rdparty/libprocess/include/process/firewall.hpp
index ad461ca..0a7b985 100644
--- a/3rdparty/libprocess/include/process/firewall.hpp
+++ b/3rdparty/libprocess/include/process/firewall.hpp
@@ -55,7 +55,7 @@ public:
    *     for failure. Otherwise an unset 'Option' object.
    */
   virtual Option<http::Response> apply(
-      const network::Socket& socket,
+      const network::inet::Socket& socket,
       const http::Request& request) = 0;
 };
 
@@ -75,7 +75,7 @@ public:
   virtual ~DisabledEndpointsFirewallRule() {}
 
   virtual Option<http::Response> apply(
-      const network::Socket&,
+      const network::inet::Socket&,
       const http::Request& request)
   {
     if (paths.contains(request.url.path)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index a684e09..d464b9f 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -30,6 +30,7 @@
 #include <process/future.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
+#include <process/socket.hpp>
 
 #include <stout/error.hpp>
 #include <stout/hashmap.hpp>
@@ -49,12 +50,7 @@ namespace process {
 template <typename T>
 class Future;
 
-namespace network {
-class Socket;
-} // namespace network {
-
 namespace http {
-
 namespace authentication {
 
 class Authenticator;
@@ -451,7 +447,7 @@ struct Request
 
   // For server requests, this contains the address of the client.
   // Note that this may correspond to a proxy or load balancer address.
-  network::Address client;
+  network::inet::Address client;
 
   // Clients can choose to provide the entire body at once
   // via BODY or can choose to stream the body over to the
@@ -853,7 +849,7 @@ public:
   bool operator!=(const Connection& c) const { return !(*this == c); }
 
 private:
-  Connection(const network::Socket& s);
+  Connection(const network::inet::Socket& s);
   friend Future<Connection> connect(const URL&);
 
   // Forward declaration.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 36453b6..b274c55 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -46,13 +46,13 @@ struct UPID
   UPID(const char* id_, const net::IP& ip_, uint16_t port_)
     : id(id_), address(ip_, port_) {}
 
-  UPID(const char* id_, const network::Address& address_)
+  UPID(const char* id_, const network::inet::Address& address_)
     : id(id_), address(address_) {}
 
   UPID(const std::string& id_, const net::IP& ip_, uint16_t port_)
     : id(id_), address(ip_, port_) {}
 
-  UPID(const std::string& id_, const network::Address& address_)
+  UPID(const std::string& id_, const network::inet::Address& address_)
     : id(id_), address(address_) {}
 
   /*implicit*/ UPID(const char* s);
@@ -91,8 +91,9 @@ struct UPID
   {
     return !(*this == that);
   }
+
   std::string id;
-  network::Address address;
+  network::inet::Address address;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index de23f0c..1b066b2 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -539,7 +539,7 @@ std::string absolutePath(const std::string& path);
 /**
  * Returns the socket address associated with this instance of the library.
  */
-network::Address address();
+network::inet::Address address();
 
 
 /**

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index a70954a..087a072 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -32,24 +32,32 @@
 
 namespace process {
 namespace network {
+namespace internal {
 
 /**
- * An abstraction around a socket (file descriptor).
+ * Implementation interface for a `Socket`.
  *
- * Provides reference counting such that the socket is only closed
- * (and thus, has the possiblity of being reused) after there are no
- * more references.
+ * Each socket is:
+ *   - reference counted,
+ *   - shared by default,
+ *   - and a concurrent object.
+ *
+ * Multiple implementations are supported via the Pimpl pattern,
+ * rather than forcing each Socket implementation to do this themselves.
+ *
+ * @see process::network::Socket
+ * @see [Pimpl pattern](https://en.wikipedia.org/wiki/Opaque_pointer)
  */
-class Socket
+class SocketImpl : public std::enable_shared_from_this<SocketImpl>
 {
 public:
   /**
    * Available kinds of implementations.
    *
-   * @see process::network::PollSocketImpl
-   * @see process::network::LibeventSSLSocketImpl
+   * @see process::network::internal::PollSocketImpl
+   * @see process::network::internal::LibeventSSLSocketImpl
    */
-  enum Kind
+  enum class Kind
   {
     POLL,
 #ifdef USE_SSL_SOCKET
@@ -58,180 +66,222 @@ public:
   };
 
   /**
-   * Returns an instance of a `Socket` using the specified kind of
-   * implementation. All implementations will set the NONBLOCK and
-   * CLOEXEC options on the returned socket.
+   * Returns the default `Kind` of implementation.
+   */
+  static Kind DEFAULT_KIND();
+
+  /**
+   * Returns an instance of a `SocketImpl` using the specified kind of
+   * implementation.
    *
-   * @param kind Optional. The desired `Socket` implementation.
-   * @param s Optional.  The file descriptor to wrap with the `Socket`.
+   * @param s. The existing file descriptor to use.
+   * @param kind Optional. The desired implementation.
    *
-   * @return An instance of a `Socket`.
+   * @return An instance of a `SocketImpl`.
+   */
+  static Try<std::shared_ptr<SocketImpl>> create(
+      int s,
+      Kind kind = DEFAULT_KIND());
+
+  /**
+   * Returns an instance of a `SocketImpl` using the specified kind of
+   * implementation. The NONBLOCK and CLOEXEC options will be set on
+   * the underlying file descriptor for the socket.
    *
-   * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
-   * configurable by the caller of the interface.
+   * @param kind Optional. The desired implementation.
+   *
+   * @return An instance of a `SocketImpl`.
    */
-  static Try<Socket> create(Kind kind = DEFAULT_KIND(), Option<int> s = None());
+   // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
+   // configurable by the caller of the interface.
+  static Try<std::shared_ptr<SocketImpl>> create(
+      Kind kind = DEFAULT_KIND());
+
+  virtual ~SocketImpl()
+  {
+    // Don't close if the socket was released.
+    if (s >= 0) {
+      CHECK_SOME(os::close(s)) << "Failed to close socket";
+    }
+  }
 
   /**
-   * Returns the default `Kind` of implementation of `Socket`.
+   * Returns the file descriptor wrapped by this implementation.
+   */
+  int get() const
+  {
+    return s;
+  }
+
+  /**
+   * @copydoc network::address
+   */
+  Try<Address> address() const;
+
+  /**
+   * @copydoc network::peer
+   */
+  Try<Address> peer() const;
+
+  /**
+   * Assigns the specified address to the socket.
    *
-   * @see process::network::Socket::Kind
+   * @return The assigned `Address` or an error if the bind system
+   *     call fails.
    */
-  static Kind DEFAULT_KIND();
+  Try<Address> bind(const Address& address);
+
+  virtual Try<Nothing> listen(int backlog) = 0;
 
   /**
-   * Returns the kind representing the underlying implementation
-   * of the `Socket` instance.
+   * Returns an implementation corresponding to the next pending
+   * connection for the listening socket. All implementations will set
+   * the NONBLOCK and CLOEXEC options on the returned socket.
    *
-   * @see process::network::Socket::Kind
+   * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
+   * configurable by the caller of the interface.
    */
-  Kind kind() const { return impl->kind(); }
+  virtual Future<std::shared_ptr<SocketImpl>> accept() = 0;
+
+  virtual Future<Nothing> connect(const Address& address) = 0;
+  virtual Future<size_t> recv(char* data, size_t size) = 0;
+  virtual Future<size_t> send(const char* data, size_t size) = 0;
+  virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
 
   /**
-   * Interface for a `Socket`.
+   * An overload of `recv`, which receives data based on the specified
+   * 'size' parameter.
    *
-   * Each socket is:
-   *   - reference counted,
-   *   - shared by default,
-   *   - and a concurrent object.
+   * @param size
+   *       Value  | Semantics
+   *     :-------:|-----------
+   *        0     | Returns an empty string.
+   *       -1     | Receives until EOF.
+   *        N     | Returns a string of size N.
+   *       'None' | Returns a string of the available data.
+   *     If 'None' is specified, whenever data becomes available on the
+   *     socket, that much data will be returned.
+   */
+  // TODO(benh): Consider returning Owned<std::string> or
+  // Shared<std::string>, the latter enabling reuse of a pool of
+  // preallocated strings/buffers.
+  virtual Future<std::string> recv(const Option<ssize_t>& size = None());
+
+  /**
+   * An overload of `send`, which sends all of the specified data.
    *
-   * Multiple implementations are supported via the Pimpl pattern,
-   * rather than forcing each Socket implementation to do this themselves.
+   * @param data The specified data to send.
    *
-   * @see process::network::Socket
-   * @see [Pimpl pattern](https://en.wikipedia.org/wiki/Opaque_pointer)
+   * @return Nothing or an error in case the sending fails.
+   */
+  // TODO(benh): Consider taking Shared<std::string>, the latter
+  // enabling reuse of a pool of preallocated strings/buffers.
+  virtual Future<Nothing> send(const std::string& data);
+
+  /**
+   * Shutdown the receive-side of the socket. No further data can be
+   * received from the socket.
    */
-  class Impl : public std::enable_shared_from_this<Impl>
+  // TODO(neilc): Change this to allow the caller to specify `how`.
+  // See MESOS-5658.
+  virtual Try<Nothing> shutdown()
   {
-  public:
-    virtual ~Impl()
-    {
-      // Don't close if the socket was released.
-      if (s >= 0) {
-        CHECK_SOME(os::close(s)) << "Failed to close socket";
-      }
+    if (::shutdown(s, SHUT_RD) < 0) {
+      return ErrnoError();
     }
 
-    /**
-     * Returns the file descriptor wrapped by the `Socket`.
-     */
-    int get() const
-    {
-      return s;
-    }
+    return Nothing();
+  }
 
-    /**
-     * @copydoc network::address
-     */
-    Try<Address> address() const;
-
-    /**
-     * @copydoc network::peer
-     */
-    Try<Address> peer() const;
-
-    /**
-     * Assigns the specified address to the `Socket`.
-     *
-     * @return The assigned `Address` or an error if the bind system
-     *     call fails.
-     */
-    Try<Address> bind(const Address& address);
-
-    virtual Try<Nothing> listen(int backlog) = 0;
-
-    /**
-     * Returns a socket corresponding to the next pending connection
-     * for the listening socket. All implementations will set the
-     * NONBLOCK and CLOEXEC options on the returned socket.
-     *
-     * TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
-     * configurable by the caller of the interface.
-     */
-    virtual Future<std::shared_ptr<Impl>> accept() = 0;
-
-    virtual Future<Nothing> connect(const Address& address) = 0;
-    virtual Future<size_t> recv(char* data, size_t size) = 0;
-    virtual Future<size_t> send(const char* data, size_t size) = 0;
-    virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
-
-    /**
-     * An overload of `recv`, which receives data based on the specified
-     * 'size' parameter.
-     *
-     * @param size
-     *       Value  | Semantics
-     *     :-------:|-----------
-     *        0     | Returns an empty string.
-     *       -1     | Receives until EOF.
-     *        N     | Returns a string of size N.
-     *       'None' | Returns a string of the available data.
-     *     If 'None' is specified, whenever data becomes available on the
-     *     socket, that much data will be returned.
-     */
-    // TODO(benh): Consider returning Owned<std::string> or
-    // Shared<std::string>, the latter enabling reuse of a pool of
-    // preallocated strings/buffers.
-    virtual Future<std::string> recv(const Option<ssize_t>& size = None());
-
-    /**
-     * An overload of `send`, which sends all of the specified data.
-     *
-     * @param data The specified data to send.
-     *
-     * @return Nothing or an error in case the sending fails.
-     */
-    // TODO(benh): Consider taking Shared<std::string>, the latter
-    // enabling reuse of a pool of preallocated strings/buffers.
-    virtual Future<Nothing> send(const std::string& data);
-
-    /**
-     * Shutdown the receive-side of the socket. No further data can be
-     * received from the socket.
-     */
-    // TODO(neilc): Change this to allow the caller to specify `how`.
-    // See MESOS-5658.
-    virtual Try<Nothing> shutdown()
-    {
-      if (::shutdown(s, SHUT_RD) < 0) {
-        return ErrnoError();
-      }
-
-      return Nothing();
-    }
+  virtual Kind kind() const = 0;
 
-    virtual Socket::Kind kind() const = 0;
-
-  protected:
-    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
-
-    /**
-     * Releases ownership of the file descriptor. Not exposed
-     * via the `Socket` interface as this is only intended to
-     * support `Socket::Impl` implementations that need to
-     * override the file descriptor ownership.
-     */
-    int release()
-    {
-      int released = s;
-      s = -1;
-      return released;
-    }
+protected:
+  explicit SocketImpl(int _s) : s(_s) { CHECK(s >= 0); }
+
+  /**
+   * Releases ownership of the file descriptor. Not exposed
+   * via the `Socket` interface as this is only intended to
+   * support `Socket::Impl` implementations that need to
+   * override the file descriptor ownership.
+   */
+  int release()
+  {
+    int released = s;
+    s = -1;
+    return released;
+  }
 
-    /**
-     * Returns a `std::shared_ptr<T>` from this implementation.
-     */
-    template <typename T>
-    static std::shared_ptr<T> shared(T* t)
-    {
-      std::shared_ptr<T> pointer =
-        std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
-      CHECK(pointer);
-      return pointer;
+  /**
+   * Returns a `std::shared_ptr<T>` from this implementation.
+   */
+  template <typename T>
+  static std::shared_ptr<T> shared(T* t)
+  {
+    std::shared_ptr<T> pointer =
+      std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
+    CHECK(pointer);
+    return pointer;
+  }
+
+  int s;
+};
+
+
+/**
+ * An abstraction around a socket (file descriptor).
+ *
+ * Provides reference counting such that the socket is only closed
+ * (and thus, has the possiblity of being reused) after there are no
+ * more references.
+ */
+template <typename AddressType>
+class Socket
+{
+public:
+  /**
+   * Returns an instance of a `Socket` using the specified kind of
+   * implementation.
+   *
+   * @param s Optional.  The file descriptor to wrap with the `Socket`.
+   * @param kind Optional. The desired `Socket` implementation.
+   *
+   * @return An instance of a `Socket`.
+   */
+  static Try<Socket> create(
+      int s,
+      SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND())
+  {
+    Try<std::shared_ptr<SocketImpl>> impl = SocketImpl::create(s, kind);
+    if (impl.isError()) {
+      return Error(impl.error());
     }
+    return Socket(impl.get());
+  }
 
-    int s;
-  };
+  /**
+   * Returns an instance of a `Socket` using `AddressType` to determine
+   * the address family to use. An optional implementation kind can be
+   * specified. The NONBLOCK and CLOEXEC options will be set on the
+   * underlying file descriptor for the socket.
+   *
+   * @param kind Optional. The desired `Socket` implementation.
+   *
+   * @return An instance of a `Socket`.
+   */
+  // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
+  // configurable by the caller of the interface.
+  static Try<Socket> create(SocketImpl::Kind kind = SocketImpl::DEFAULT_KIND());
+
+  /**
+   * Returns the kind representing the underlying implementation
+   * of the `Socket` instance.
+   *
+   * @see process::network::Socket::Kind
+   */
+  SocketImpl::Kind kind() const
+  {
+    return impl->kind();
+  }
 
   bool operator==(const Socket& that) const
   {
@@ -243,12 +293,12 @@ public:
     return impl->get();
   }
 
-  Try<Address> address() const
+  Try<AddressType> address() const
   {
     return impl->address();
   }
 
-  Try<Address> peer() const
+  Try<AddressType> peer() const
   {
     return impl->peer();
   }
@@ -258,7 +308,7 @@ public:
     return impl->get();
   }
 
-  Try<Address> bind(const Address& address = Address::LOCALHOST_ANY())
+  Try<AddressType> bind(const AddressType& address)
   {
     return impl->bind(address);
   }
@@ -272,12 +322,12 @@ public:
   {
     return impl->accept()
       // TODO(benh): Use && for `impl` here!
-      .then([](const std::shared_ptr<Impl>& impl) {
+      .then([](const std::shared_ptr<SocketImpl>& impl) {
         return Socket(impl);
       });
   }
 
-  Future<Nothing> connect(const Address& address)
+  Future<Nothing> connect(const AddressType& address)
   {
     return impl->connect(address);
   }
@@ -312,14 +362,36 @@ public:
     return impl->shutdown();
   }
 
+  // Support implicit conversion of any `Socket<AddressType>` to a
+  // `Socket<network::Address>`.
+  operator Socket<network::Address>() const
+  {
+    return Socket<network::Address>(impl);
+  }
+
 private:
-  explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+  // Necessary to support the implicit conversion operator from any
+  // `Socket<AddressType>` to `Socket<network::Address>`.
+  template <typename T>
+  friend class Socket;
+
+  explicit Socket(std::shared_ptr<SocketImpl>&& that) : impl(std::move(that)) {}
 
-  explicit Socket(const std::shared_ptr<Impl>& that) : impl(that) {}
+  explicit Socket(const std::shared_ptr<SocketImpl>& that) : impl(that) {}
 
-  std::shared_ptr<Impl> impl;
+  std::shared_ptr<SocketImpl> impl;
 };
 
+} // namespace internal {
+
+
+using Socket = network::internal::Socket<network::Address>;
+
+namespace inet {
+
+using Socket = network::internal::Socket<inet::Address>;
+
+} // namespace inet {
 } // namespace network {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/include/process/ssl/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/ssl/gtest.hpp b/3rdparty/libprocess/include/process/ssl/gtest.hpp
index 21a0fc4..8649da1 100644
--- a/3rdparty/libprocess/include/process/ssl/gtest.hpp
+++ b/3rdparty/libprocess/include/process/ssl/gtest.hpp
@@ -287,24 +287,25 @@ protected:
    *
    * @return Socket if successful otherwise an Error.
    */
-  Try<process::network::Socket> setup_server(
+  Try<process::network::inet::Socket> setup_server(
       const std::map<std::string, std::string>& environment)
   {
     set_environment_variables(environment);
 
-    const Try<process::network::Socket> create =
-      process::network::Socket::create(process::network::Socket::SSL);
+    const Try<process::network::inet::Socket> create =
+      process::network::inet::Socket::create(
+          process::network::internal::SocketImpl::Kind::SSL);
 
     if (create.isError()) {
       return Error(create.error());
     }
 
-    process::network::Socket server = create.get();
+    process::network::inet::Socket server = create.get();
 
     // We need to explicitly bind to INADDR_LOOPBACK so the
     // certificate we create in this test fixture can be verified.
-    Try<process::network::Address> bind =
-      server.bind(process::network::Address(net::IP(INADDR_LOOPBACK), 0));
+    Try<process::network::inet::Address> bind =
+      server.bind(process::network::inet::Address(net::IP(INADDR_LOOPBACK), 0));
 
     if (bind.isError()) {
       return Error(bind.error());
@@ -334,10 +335,10 @@ protected:
    */
   Try<process::Subprocess> launch_client(
       const std::map<std::string, std::string>& environment,
-      const process::network::Socket& server,
+      const process::network::inet::Socket& server,
       bool use_ssl_socket)
   {
-    const Try<process::network::Address> address = server.address();
+    const Try<process::network::inet::Address> address = server.address();
     if (address.isError()) {
       return Error(address.error());
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 515821a..9667a62 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -46,7 +46,7 @@ public:
     FILE
   };
 
-  explicit Encoder(const network::Socket& _s) : s(_s) {}
+  explicit Encoder(const network::inet::Socket& _s) : s(_s) {}
   virtual ~Encoder() {}
 
   virtual Kind kind() const = 0;
@@ -55,20 +55,20 @@ public:
 
   virtual size_t remaining() const = 0;
 
-  network::Socket socket() const
+  network::inet::Socket socket() const
   {
     return s;
   }
 
 private:
-  const network::Socket s; // The socket this encoder is associated with.
+  const network::inet::Socket s; // The socket this encoder is associated with.
 };
 
 
 class DataEncoder : public Encoder
 {
 public:
-  DataEncoder(const network::Socket& s, const std::string& _data)
+  DataEncoder(const network::inet::Socket& s, const std::string& _data)
     : Encoder(s), data(_data), index(0) {}
 
   virtual ~DataEncoder() {}
@@ -107,7 +107,7 @@ private:
 class MessageEncoder : public DataEncoder
 {
 public:
-  MessageEncoder(const network::Socket& s, Message* _message)
+  MessageEncoder(const network::inet::Socket& s, Message* _message)
     : DataEncoder(s, encode(_message)), message(_message) {}
 
   virtual ~MessageEncoder()
@@ -162,7 +162,7 @@ class HttpResponseEncoder : public DataEncoder
 {
 public:
   HttpResponseEncoder(
-      const network::Socket& s,
+      const network::inet::Socket& s,
       const http::Response& response,
       const http::Request& request)
     : DataEncoder(s, encode(response, request)) {}
@@ -251,7 +251,7 @@ public:
 class FileEncoder : public Encoder
 {
 public:
-  FileEncoder(const network::Socket& s, int _fd, size_t _size)
+  FileEncoder(const network::inet::Socket& s, int _fd, size_t _size)
     : Encoder(s), fd(_fd), size(_size), index(0) {}
 
   virtual ~FileEncoder()

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 3f16f29..93a8a80 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -66,8 +66,10 @@ using std::vector;
 using process::http::Request;
 using process::http::Response;
 
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
+
+using process::network::internal::SocketImpl;
 
 namespace process {
 namespace http {
@@ -1341,12 +1343,12 @@ Future<Connection> connect(const URL& url)
   Try<Socket> socket = [&url]() -> Try<Socket> {
     // Default to 'http' if no scheme was specified.
     if (url.scheme.isNone() || url.scheme == string("http")) {
-      return Socket::create(Socket::POLL);
+      return Socket::create(SocketImpl::Kind::POLL);
     }
 
     if (url.scheme == string("https")) {
 #ifdef USE_SSL_SOCKET
-      return Socket::create(Socket::SSL);
+      return Socket::create(SocketImpl::Kind::SSL);
 #else
       return Error("'https' scheme requires SSL enabled");
 #endif

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 9cade79..f9551bf 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -78,8 +78,9 @@ static Synchronized<bufferevent> synchronize(bufferevent* bev)
 
 namespace process {
 namespace network {
+namespace internal {
 
-Try<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::create(int s)
+Try<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::create(int s)
 {
   openssl::initialize();
 
@@ -437,7 +438,7 @@ void LibeventSSLSocketImpl::event_callback(short events)
 
 
 LibeventSSLSocketImpl::LibeventSSLSocketImpl(int _s)
-  : Socket::Impl(_s),
+  : SocketImpl(_s),
     bev(nullptr),
     listener(nullptr),
     recv_request(nullptr),
@@ -450,7 +451,7 @@ LibeventSSLSocketImpl::LibeventSSLSocketImpl(
     int _s,
     bufferevent* _bev,
     Option<string>&& _peer_hostname)
-  : Socket::Impl(_s),
+  : SocketImpl(_s),
     bev(_bev),
     listener(nullptr),
     recv_request(nullptr),
@@ -899,13 +900,13 @@ Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
 }
 
 
-Future<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::accept()
+Future<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::accept()
 {
   // We explicitly specify the return type to avoid a type deduction
   // issue in some versions of clang. See MESOS-2943.
   return accept_queue.get()
-    .then([](const Future<std::shared_ptr<Socket::Impl>>& impl)
-      -> Future<std::shared_ptr<Socket::Impl>> {
+    .then([](const Future<std::shared_ptr<SocketImpl>>& impl)
+      -> Future<std::shared_ptr<SocketImpl>> {
       CHECK(!impl.isPending());
       return impl;
     });
@@ -967,7 +968,7 @@ void LibeventSSLSocketImpl::peek_callback(
     accept_SSL_callback(request);
   } else {
     // Downgrade to a non-SSL socket implementation.
-    Try<std::shared_ptr<Socket::Impl>> impl = PollSocketImpl::create(fd);
+    Try<std::shared_ptr<SocketImpl>> impl = PollSocketImpl::create(fd);
     if (impl.isError()) {
       request->promise.fail(impl.error());
     } else {
@@ -983,13 +984,13 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
 {
   CHECK(__in_event_loop__);
 
-  Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue_ = accept_queue;
+  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue_ = accept_queue;
 
   // After the socket is accepted, it must complete the SSL
   // handshake (or be downgraded to a regular socket) before
   // we put it in the queue of connected sockets.
   request->promise.future()
-    .onAny([accept_queue_](Future<std::shared_ptr<Socket::Impl>> impl) mutable {
+    .onAny([accept_queue_](Future<std::shared_ptr<SocketImpl>> impl) mutable {
       accept_queue_.put(impl);
     });
 
@@ -1121,7 +1122,7 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
               &LibeventSSLSocketImpl::event_callback,
               CHECK_NOTNULL(impl->event_loop_handle));
 
-          request->promise.set(std::dynamic_pointer_cast<Socket::Impl>(impl));
+          request->promise.set(std::dynamic_pointer_cast<SocketImpl>(impl));
         } else if (events & BEV_EVENT_ERROR) {
           std::ostringstream stream;
           if (EVUTIL_SOCKET_ERROR() != 0) {
@@ -1158,5 +1159,6 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
       request);
 }
 
+} // namespace internal {
 } // namespace network {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index ed53976..57eaf4f 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -27,26 +27,27 @@
 
 namespace process {
 namespace network {
+namespace internal {
 
-class LibeventSSLSocketImpl : public Socket::Impl
+class LibeventSSLSocketImpl : public SocketImpl
 {
 public:
   // See 'Socket::create()'.
-  static Try<std::shared_ptr<Socket::Impl>> create(int s);
+  static Try<std::shared_ptr<SocketImpl>> create(int s);
 
   LibeventSSLSocketImpl(int _s);
 
   virtual ~LibeventSSLSocketImpl();
 
-  // Implement 'Socket::Impl' interface.
+  // Implement 'SocketImpl' interface.
   virtual Future<Nothing> connect(const Address& address);
   virtual Future<size_t> recv(char* data, size_t size);
   // Send does not currently support discard. See implementation.
   virtual Future<size_t> send(const char* data, size_t size);
   virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
   virtual Try<Nothing> listen(int backlog);
-  virtual Future<std::shared_ptr<Socket::Impl>> accept();
-  virtual Socket::Kind kind() const { return Socket::SSL; }
+  virtual Future<std::shared_ptr<SocketImpl>> accept();
+  virtual SocketImpl::Kind kind() const { return SocketImpl::Kind::SSL; }
 
   // This call is used to do the equivalent of shutting down the read
   // end. This means finishing the future of any outstanding read
@@ -74,7 +75,7 @@ private:
         socket(_socket),
         ip(_ip) {}
     event* peek_event;
-    Promise<std::shared_ptr<Socket::Impl>> promise;
+    Promise<std::shared_ptr<SocketImpl>> promise;
     evconnlistener* listener;
     int socket;
     Option<net::IP> ip;
@@ -175,12 +176,13 @@ private:
   // downgraded). The 'accept()' call returns sockets from this queue.
   // We wrap the socket in a 'Future' so that we can pass failures or
   // discards through.
-  Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue;
+  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
 
   Option<std::string> peer_hostname;
   Option<net::IP> peer_ip;
 };
 
+} // namespace internal {
 } // namespace network {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index f9313cd..ffb1021 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -101,7 +101,7 @@ istream& operator>>(istream& stream, UPID& pid)
 
   string id;
   string host;
-  network::Address address;
+  network::inet::Address address;
 
   size_t index = str.find('@');
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index ff06e56..9184ea3 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -32,8 +32,9 @@ using std::string;
 
 namespace process {
 namespace network {
+namespace internal {
 
-Try<std::shared_ptr<Socket::Impl>> PollSocketImpl::create(int s)
+Try<std::shared_ptr<SocketImpl>> PollSocketImpl::create(int s)
 {
   return std::make_shared<PollSocketImpl>(s);
 }
@@ -97,12 +98,12 @@ Future<int> accept(int fd)
 } // namespace internal {
 
 
-Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
+Future<std::shared_ptr<SocketImpl>> PollSocketImpl::accept()
 {
   return io::poll(get(), io::READ)
     .then(lambda::bind(&internal::accept, get()))
-    .then([](int s) -> Future<std::shared_ptr<Socket::Impl>> {
-      Try<std::shared_ptr<Socket::Impl>> impl = create(s);
+    .then([](int s) -> Future<std::shared_ptr<SocketImpl>> {
+      Try<std::shared_ptr<SocketImpl>> impl = create(s);
       if (impl.isError()) {
         os::close(s);
         return Failure("Failed to create socket: " + impl.error());
@@ -276,5 +277,6 @@ Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
         size));
 }
 
+} // namespace internal {
 } // namespace network {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index 3ba3678..89789e6 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -18,26 +18,27 @@
 
 namespace process {
 namespace network {
+namespace internal {
 
-class PollSocketImpl : public Socket::Impl
+class PollSocketImpl : public SocketImpl
 {
 public:
-  static Try<std::shared_ptr<Socket::Impl>> create(int s);
+  static Try<std::shared_ptr<SocketImpl>> create(int s);
 
-  PollSocketImpl(int s) : Socket::Impl(s) {}
+  PollSocketImpl(int s) : SocketImpl(s) {}
 
   virtual ~PollSocketImpl() {}
 
-  // Implementation of the Socket::Impl interface.
+  // Implementation of the SocketImpl interface.
   virtual Try<Nothing> listen(int backlog);
-  virtual Future<std::shared_ptr<Socket::Impl>> accept();
+  virtual Future<std::shared_ptr<SocketImpl>> accept();
   virtual Future<Nothing> connect(const Address& address);
   virtual Future<size_t> recv(char* data, size_t size);
   virtual Future<size_t> send(const char* data, size_t size);
   virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
-
-  virtual Socket::Kind kind() const { return Socket::POLL; }
+  virtual Kind kind() const { return SocketImpl::Kind::POLL; }
 };
 
+} // namespace internal {
 } // namespace network {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e9a4bbb..a07d5e3 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -134,8 +134,10 @@ using process::http::authentication::AuthenticatorManager;
 
 using process::http::authorization::AuthorizationCallbacks;
 
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
+
+using process::network::internal::SocketImpl;
 
 using std::deque;
 using std::find;
@@ -371,7 +373,7 @@ public:
   void link(ProcessBase* process,
             const UPID& to,
             const ProcessBase::RemoteConnection remote,
-            const Socket::Kind& kind = Socket::DEFAULT_KIND());
+            const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
 
   // Test-only method to fetch the file descriptor behind a
   // persistent socket.
@@ -389,7 +391,7 @@ public:
             const Request& request,
             const Socket& socket);
   void send(Message* message,
-            const Socket::Kind& kind = Socket::DEFAULT_KIND());
+            const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
 
   Encoder* next(int s);
 
@@ -1683,7 +1685,7 @@ void SocketManager::link_connect(
       future.isFailed() &&
       network::openssl::flags().enabled &&
       network::openssl::flags().support_downgrade &&
-      socket.kind() == Socket::SSL;
+      socket.kind() == SocketImpl::Kind::SSL;
 
     Option<Socket> poll_socket = None();
 
@@ -1699,7 +1701,7 @@ void SocketManager::link_connect(
           return;
         }
 
-        Try<Socket> create = Socket::create(Socket::POLL);
+        Try<Socket> create = Socket::create(SocketImpl::Kind::POLL);
         if (create.isError()) {
           VLOG(1) << "Failed to link, create socket: " << create.error();
           socket_manager->close(socket);
@@ -1779,7 +1781,7 @@ void SocketManager::link(
     ProcessBase* process,
     const UPID& to,
     const ProcessBase::RemoteConnection remote,
-    const Socket::Kind& kind)
+    const SocketImpl::Kind& kind)
 {
   // TODO(benh): The semantics we want to support for link are such
   // that if there is nobody to link to (local or remote) then an
@@ -2101,7 +2103,7 @@ void SocketManager::send_connect(
       future.isFailed() &&
       network::openssl::flags().enabled &&
       network::openssl::flags().support_downgrade &&
-      socket.kind() == Socket::SSL;
+      socket.kind() == SocketImpl::Kind::SSL;
 
     Option<Socket> poll_socket = None();
 
@@ -2109,7 +2111,7 @@ void SocketManager::send_connect(
     // POLL socket.
     if (attempt_downgrade) {
       synchronized (mutex) {
-        Try<Socket> create = Socket::create(Socket::POLL);
+        Try<Socket> create = Socket::create(SocketImpl::Kind::POLL);
         if (create.isError()) {
           VLOG(1) << "Failed to link, create socket: " << create.error();
           socket_manager->close(socket);
@@ -2168,7 +2170,7 @@ void SocketManager::send_connect(
 }
 
 
-void SocketManager::send(Message* message, const Socket::Kind& kind)
+void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
 {
   CHECK(message != nullptr);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 8f372aa..0b7631d 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -32,89 +32,72 @@ using std::string;
 
 namespace process {
 namespace network {
+namespace internal {
 
-Try<Socket> Socket::create(Kind kind, Option<int> s)
+Try<std::shared_ptr<SocketImpl>> SocketImpl::create(int s, Kind kind)
 {
-  // If the caller passed in a file descriptor, we do
-  // not own its life cycle and must not close it.
-  bool owned = s.isNone();
+  switch (kind) {
+    case Kind::POLL:
+      return PollSocketImpl::create(s);
+#ifdef USE_SSL_SOCKET
+    case Kind::SSL:
+      return LibeventSSLSocketImpl::create(s);
+#endif
+  }
+}
+
 
-  if (owned) {
-    // Supported in Linux >= 2.6.27.
+Try<std::shared_ptr<SocketImpl>> SocketImpl::create(Kind kind)
+{
+  // Supported in Linux >= 2.6.27.
 #if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
-    Try<int> fd =
-      network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+  Try<int> s =
+    network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
 
-    if (fd.isError()) {
-      return Error("Failed to create socket: " + fd.error());
-    }
+  if (s.isError()) {
+    return Error("Failed to create socket: " + s.error());
+  }
 #else
-    Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
-    if (fd.isError()) {
-      return Error("Failed to create socket: " + fd.error());
-    }
-
-    Try<Nothing> nonblock = os::nonblock(fd.get());
-    if (nonblock.isError()) {
-      os::close(fd.get());
-      return Error("Failed to create socket, nonblock: " + nonblock.error());
-    }
-
-    Try<Nothing> cloexec = os::cloexec(fd.get());
-    if (cloexec.isError()) {
-      os::close(fd.get());
-      return Error("Failed to create socket, cloexec: " + cloexec.error());
-    }
-#endif
+  Try<int> s = network::socket(AF_INET, SOCK_STREAM, 0);
+  if (s.isError()) {
+    return Error("Failed to create socket: " + s.error());
+  }
 
-    s = fd.get();
+  Try<Nothing> nonblock = os::nonblock(s.get());
+  if (nonblock.isError()) {
+    os::close(s.get());
+    return Error("Failed to create socket, nonblock: " + nonblock.error());
   }
 
-  switch (kind) {
-    case POLL: {
-      Try<std::shared_ptr<Socket::Impl>> socket =
-        PollSocketImpl::create(s.get());
-      if (socket.isError()) {
-        if (owned) {
-          os::close(s.get());
-        }
-        return Error(socket.error());
-      }
-      return Socket(socket.get());
-    }
-#ifdef USE_SSL_SOCKET
-    case SSL: {
-      Try<std::shared_ptr<Socket::Impl>> socket =
-        LibeventSSLSocketImpl::create(s.get());
-      if (socket.isError()) {
-        if (owned) {
-          os::close(s.get());
-        }
-        return Error(socket.error());
-      }
-      return Socket(socket.get());
-    }
+  Try<Nothing> cloexec = os::cloexec(s.get());
+  if (cloexec.isError()) {
+    os::close(s.get());
+    return Error("Failed to create socket, cloexec: " + cloexec.error());
+  }
 #endif
-    // By not setting a default we leverage the compiler errors when
-    // the enumeration is augmented to find all the cases we need to
-    // provide.
+
+  Try<std::shared_ptr<SocketImpl>> impl = create(s.get(), kind);
+  if (impl.isError()) {
+    os::close(s.get());
   }
+
+  return impl;
 }
 
 
-Socket::Kind Socket::DEFAULT_KIND()
+SocketImpl::Kind SocketImpl::DEFAULT_KIND()
 {
   // NOTE: Some tests may change the OpenSSL flags and reinitialize
   // libprocess. In non-test code, the return value should be constant.
 #ifdef USE_SSL_SOCKET
-      return network::openssl::flags().enabled ? Socket::SSL : Socket::POLL;
+  return network::openssl::flags().enabled ? Kind::SSL : Kind::POLL;
 #else
-      return Socket::POLL;
+  return Kind::POLL;
 #endif
 }
 
 
-Try<Address> Socket::Impl::address() const
+Try<Address> SocketImpl::address() const
 {
   // TODO(benh): Cache this result so that we don't have to make
   // unnecessary system calls each time.
@@ -122,7 +105,7 @@ Try<Address> Socket::Impl::address() const
 }
 
 
-Try<Address> Socket::Impl::peer() const
+Try<Address> SocketImpl::peer() const
 {
   // TODO(benh): Cache this result so that we don't have to make
   // unnecessary system calls each time.
@@ -130,7 +113,7 @@ Try<Address> Socket::Impl::peer() const
 }
 
 
-Try<Address> Socket::Impl::bind(const Address& address)
+Try<Address> SocketImpl::bind(const Address& address)
 {
   Try<Nothing> bind = network::bind(get(), address);
   if (bind.isError()) {
@@ -143,7 +126,7 @@ Try<Address> Socket::Impl::bind(const Address& address)
 
 
 static Future<string> _recv(
-    const std::shared_ptr<Socket::Impl>& impl,
+    const std::shared_ptr<SocketImpl>& impl,
     const Option<ssize_t>& size,
     Owned<string> buffer,
     size_t chunk,
@@ -191,7 +174,7 @@ static Future<string> _recv(
 }
 
 
-Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
+Future<string> SocketImpl::recv(const Option<ssize_t>& size)
 {
   // Default chunk size to attempt to receive when nothing is
   // specified represents roughly 16 pages.
@@ -216,7 +199,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
 
 
 static Future<Nothing> _send(
-    const std::shared_ptr<Socket::Impl>& impl,
+    const std::shared_ptr<SocketImpl>& impl,
     Owned<string> data,
     size_t index,
     size_t length)
@@ -235,7 +218,7 @@ static Future<Nothing> _send(
 }
 
 
-Future<Nothing> Socket::Impl::send(const string& _data)
+Future<Nothing> SocketImpl::send(const string& _data)
 {
   Owned<string> data(new string(_data));
 
@@ -243,6 +226,6 @@ Future<Nothing> Socket::Impl::send(const string& _data)
     .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
 }
 
-
+} // namespace internal {
 } // namespace network {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index 5f84d84..7356716 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -32,10 +32,6 @@ using process::ResponseDecoder;
 using process::StreamingRequestDecoder;
 using process::StreamingResponseDecoder;
 
-using process::http::Request;
-
-using process::network::Socket;
-
 using std::deque;
 using std::string;
 
@@ -78,7 +74,7 @@ TYPED_TEST(RequestDecoderTest, Request)
   EXPECT_SOME_EQ("value2", request->url.query.get("key2"));
 
   Future<string> body = [&request]() -> Future<string> {
-    if (request->type == Request::BODY) {
+    if (request->type == http::Request::BODY) {
       return request->body;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index d41929a..22ec432 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -62,7 +62,7 @@ using process::Promise;
 
 using process::http::URL;
 
-using process::network::Socket;
+using process::network::inet::Socket;
 
 using std::string;
 using std::vector;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index ea798d0..0424a10 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -93,8 +93,8 @@ using process::UPID;
 using process::firewall::DisabledEndpointsFirewallRule;
 using process::firewall::FirewallRule;
 
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
 
 using std::move;
 using std::string;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/ssl_client.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_client.cpp b/3rdparty/libprocess/src/tests/ssl_client.cpp
index 8d62fc2..de87b3b 100644
--- a/3rdparty/libprocess/src/tests/ssl_client.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_client.cpp
@@ -32,8 +32,10 @@
 namespace network = process::network;
 namespace openssl = network::openssl;
 
-using network::Address;
-using network::Socket;
+using network::inet::Address;
+using network::inet::Socket;
+
+using network::internal::SocketImpl;
 
 using process::Future;
 
@@ -131,8 +133,8 @@ TEST_F(SSLClientTest, client)
   // Create the socket based on the 'use_ssl' flag. We use this to
   // test whether a regular socket could connect to an SSL server
   // socket.
-  const Try<Socket> create =
-    Socket::create(flags.use_ssl ? Socket::SSL : Socket::POLL);
+  const Try<Socket> create = Socket::create(
+      flags.use_ssl ? SocketImpl::Kind::SSL : SocketImpl::Kind::POLL);
   ASSERT_SOME(create);
 
   Socket socket = create.get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/ssl_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/ssl_tests.cpp b/3rdparty/libprocess/src/tests/ssl_tests.cpp
index 55c8c30..bdb9420 100644
--- a/3rdparty/libprocess/src/tests/ssl_tests.cpp
+++ b/3rdparty/libprocess/src/tests/ssl_tests.cpp
@@ -49,8 +49,10 @@ namespace io = process::io;
 namespace network = process::network;
 namespace openssl = network::openssl;
 
-using network::Address;
-using network::Socket;
+using network::inet::Address;
+using network::inet::Socket;
+
+using network::internal::SocketImpl;
 
 using process::Clock;
 using process::Failure;
@@ -112,7 +114,7 @@ TEST(SSL, Disabled)
 {
   os::setenv("LIBPROCESS_SSL_ENABLED", "false");
   openssl::reinitialize();
-  EXPECT_ERROR(Socket::create(Socket::SSL));
+  EXPECT_ERROR(Socket::create(SocketImpl::Kind::SSL));
 }
 
 
@@ -130,10 +132,10 @@ TEST_P(SSLTest, BasicSameProcess)
 
   openssl::reinitialize();
 
-  const Try<Socket> server_create = Socket::create(Socket::SSL);
+  const Try<Socket> server_create = Socket::create(SocketImpl::Kind::SSL);
   ASSERT_SOME(server_create);
 
-  const Try<Socket> client_create = Socket::create(Socket::SSL);
+  const Try<Socket> client_create = Socket::create(SocketImpl::Kind::SSL);
   ASSERT_SOME(client_create);
 
   Socket server = server_create.get();
@@ -623,7 +625,7 @@ TEST_F(SSLTest, PeerAddress)
       {"LIBPROCESS_SSL_CERT_FILE", certificate_path().string()}});
   ASSERT_SOME(server);
 
-  const Try<Socket> client_create = Socket::create(Socket::SSL);
+  const Try<Socket> client_create = Socket::create(SocketImpl::Kind::SSL);
   ASSERT_SOME(client_create);
 
   Socket client = client_create.get();
@@ -754,7 +756,7 @@ TEST_F(SSLTest, SilentSocket)
   // not complete the SSL handshake, nor be downgraded.
   // As a result, we expect that the server will not see
   // an accepted socket for this connection.
-  Try<Socket> connection = Socket::create(Socket::POLL);
+  Try<Socket> connection = Socket::create(SocketImpl::Kind::POLL);
   ASSERT_SOME(connection);
   connection->connect(server->address().get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a77817e/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 1f6cfaf..99ea1eb 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -30,8 +30,8 @@ using process::Message;
 using process::MessageEncoder;
 using process::UPID;
 
-using process::network::Address;
-using process::network::Socket;
+using process::network::inet::Address;
+using process::network::inet::Socket;
 
 
 static const int LISTEN_BACKLOG = 10;


[05/14] mesos git commit: Removed `Socket` dependency on `Socket::Impl`.

Posted by be...@apache.org.
Removed `Socket` dependency on `Socket::Impl`.

Review: https://reviews.apache.org/r/53458


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aa68496
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aa68496
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aa68496

Branch: refs/heads/master
Commit: 7aa68496df050ed1fc7be8aed0f9b46230039c3a
Parents: 5c093c8
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Nov 2 22:33:31 2016 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp  | 26 -----------------
 3rdparty/libprocess/src/libevent_ssl_socket.cpp | 30 ++++++++++----------
 3rdparty/libprocess/src/libevent_ssl_socket.hpp |  6 ++--
 3rdparty/libprocess/src/poll_socket.cpp         | 30 +++++++++++---------
 3rdparty/libprocess/src/socket.cpp              | 20 ++++++-------
 5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 97bdcb9..a70954a 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -202,27 +202,6 @@ public:
 
     virtual Socket::Kind kind() const = 0;
 
-    /**
-     * Construct a new `Socket` from the given impl.
-     *
-     * This is a proxy function, as `Impl`s derived from this won't have
-     * access to the Socket::Socket(...) constructors.
-     */
-    // TODO(jmlvanre): These should be protected; however, gcc complains
-    // when using them from within a lambda of a derived class.
-    static Socket socket(std::shared_ptr<Impl>&& that)
-    {
-      return Socket(std::move(that));
-    }
-
-    /**
-     * @copydoc process::network::Socket::Impl::socket
-     */
-    static Socket socket(const std::shared_ptr<Impl>& that)
-    {
-      return Socket(that);
-    }
-
   protected:
     explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
@@ -240,11 +219,6 @@ public:
     }
 
     /**
-     * Construct a `Socket` wrapper from this implementation.
-     */
-    Socket socket() { return Socket(shared_from_this()); }
-
-    /**
      * Returns a `std::shared_ptr<T>` from this implementation.
      */
     template <typename T>

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
index 5c0929d..9cade79 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -31,6 +31,7 @@
 #include "libevent.hpp"
 #include "libevent_ssl_socket.hpp"
 #include "openssl.hpp"
+#include "poll_socket.hpp"
 
 // Locking:
 //
@@ -898,14 +899,15 @@ Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
 }
 
 
-Future<Socket> LibeventSSLSocketImpl::accept()
+Future<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::accept()
 {
   // We explicitly specify the return type to avoid a type deduction
   // issue in some versions of clang. See MESOS-2943.
   return accept_queue.get()
-    .then([](const Future<Socket>& socket) -> Future<Socket> {
-      CHECK(!socket.isPending());
-      return socket;
+    .then([](const Future<std::shared_ptr<Socket::Impl>>& impl)
+      -> Future<std::shared_ptr<Socket::Impl>> {
+      CHECK(!impl.isPending());
+      return impl;
     });
 }
 
@@ -964,12 +966,12 @@ void LibeventSSLSocketImpl::peek_callback(
   if (ssl) {
     accept_SSL_callback(request);
   } else {
-    // Downgrade to a non-SSL socket.
-    Try<Socket> create = Socket::create(Socket::POLL, fd);
-    if (create.isError()) {
-      request->promise.fail(create.error());
+    // Downgrade to a non-SSL socket implementation.
+    Try<std::shared_ptr<Socket::Impl>> impl = PollSocketImpl::create(fd);
+    if (impl.isError()) {
+      request->promise.fail(impl.error());
     } else {
-      request->promise.set(create.get());
+      request->promise.set(impl.get());
     }
 
     delete request;
@@ -981,14 +983,14 @@ void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
 {
   CHECK(__in_event_loop__);
 
-  Queue<Future<Socket>> accept_queue_ = accept_queue;
+  Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue_ = accept_queue;
 
   // After the socket is accepted, it must complete the SSL
   // handshake (or be downgraded to a regular socket) before
   // we put it in the queue of connected sockets.
   request->promise.future()
-    .onAny([accept_queue_](Future<Socket> socket) mutable {
-      accept_queue_.put(socket);
+    .onAny([accept_queue_](Future<std::shared_ptr<Socket::Impl>> impl) mutable {
+      accept_queue_.put(impl);
     });
 
   // If we support downgrading the connection, first wait for this
@@ -1119,9 +1121,7 @@ void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
               &LibeventSSLSocketImpl::event_callback,
               CHECK_NOTNULL(impl->event_loop_handle));
 
-          Socket socket = Socket::Impl::socket(std::move(impl));
-
-          request->promise.set(socket);
+          request->promise.set(std::dynamic_pointer_cast<Socket::Impl>(impl));
         } else if (events & BEV_EVENT_ERROR) {
           std::ostringstream stream;
           if (EVUTIL_SOCKET_ERROR() != 0) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
index acb00d4..ed53976 100644
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -45,7 +45,7 @@ public:
   virtual Future<size_t> send(const char* data, size_t size);
   virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
   virtual Try<Nothing> listen(int backlog);
-  virtual Future<Socket> accept();
+  virtual Future<std::shared_ptr<Socket::Impl>> accept();
   virtual Socket::Kind kind() const { return Socket::SSL; }
 
   // This call is used to do the equivalent of shutting down the read
@@ -74,7 +74,7 @@ private:
         socket(_socket),
         ip(_ip) {}
     event* peek_event;
-    Promise<Socket> promise;
+    Promise<std::shared_ptr<Socket::Impl>> promise;
     evconnlistener* listener;
     int socket;
     Option<net::IP> ip;
@@ -175,7 +175,7 @@ private:
   // downgraded). The 'accept()' call returns sockets from this queue.
   // We wrap the socket in a 'Future' so that we can pass failures or
   // discards through.
-  Queue<Future<Socket>> accept_queue;
+  Queue<Future<std::shared_ptr<Socket::Impl>>> accept_queue;
 
   Option<std::string> peer_hostname;
   Option<net::IP> peer_ip;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index d04f048..ff06e56 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -114,12 +114,14 @@ Future<std::shared_ptr<Socket::Impl>> PollSocketImpl::accept()
 
 namespace internal {
 
-Future<Nothing> connect(const Socket& socket, const Address& to)
+Future<Nothing> connect(
+    const std::shared_ptr<PollSocketImpl>& socket,
+    const Address& to)
 {
   // Now check that a successful connection was made.
   int opt;
   socklen_t optlen = sizeof(opt);
-  int s = socket.get();
+  int s = socket->get();
 
   // NOTE: We cast to `char*` here because the function prototypes on Windows
   // use `char*` instead of `void*`.
@@ -149,7 +151,7 @@ Future<Nothing> PollSocketImpl::connect(const Address& address)
   if (connect.isError()) {
     if (net::is_inprogress_error(connect.error().code)) {
       return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, socket(), address));
+        .then(lambda::bind(&internal::connect, shared(this), address));
     }
 
     return Failure(connect.error());
@@ -167,12 +169,14 @@ Future<size_t> PollSocketImpl::recv(char* data, size_t size)
 
 namespace internal {
 
-Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
+Future<size_t> socket_send_data(
+    const std::shared_ptr<PollSocketImpl>& impl,
+    const char* data, size_t size)
 {
   CHECK(size > 0);
 
   while (true) {
-    ssize_t length = net::send(socket.get(), data, size, MSG_NOSIGNAL);
+    ssize_t length = net::send(impl->get(), data, size, MSG_NOSIGNAL);
 
 #ifdef __WINDOWS__
     int error = WSAGetLastError();
@@ -185,8 +189,8 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
       continue;
     } else if (length < 0 && net::is_retryable_error(error)) {
       // Might block, try again later.
-      return io::poll(socket.get(), io::WRITE)
-        .then(lambda::bind(&internal::socket_send_data, socket, data, size));
+      return io::poll(impl->get(), io::WRITE)
+        .then(lambda::bind(&internal::socket_send_data, impl, data, size));
     } else if (length <= 0) {
       // Socket error or closed.
       if (length < 0) {
@@ -207,7 +211,7 @@ Future<size_t> socket_send_data(Socket socket, const char* data, size_t size)
 
 
 Future<size_t> socket_send_file(
-    Socket socket,
+    const std::shared_ptr<PollSocketImpl>& impl,
     int fd,
     off_t offset,
     size_t size)
@@ -216,7 +220,7 @@ Future<size_t> socket_send_file(
 
   while (true) {
     Try<ssize_t, SocketError> length =
-      os::sendfile(socket.get(), fd, offset, size);
+      os::sendfile(impl->get(), fd, offset, size);
 
     if (length.isSome()) {
       CHECK(length.get() >= 0);
@@ -232,10 +236,10 @@ Future<size_t> socket_send_file(
       continue;
     } else if (net::is_retryable_error(length.error().code)) {
       // Might block, try again later.
-      return io::poll(socket.get(), io::WRITE)
+      return io::poll(impl->get(), io::WRITE)
         .then(lambda::bind(
             &internal::socket_send_file,
-            socket,
+            impl,
             fd,
             offset,
             size));
@@ -255,7 +259,7 @@ Future<size_t> PollSocketImpl::send(const char* data, size_t size)
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(
         &internal::socket_send_data,
-        socket(),
+        shared(this),
         data,
         size));
 }
@@ -266,7 +270,7 @@ Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(
         &internal::socket_send_file,
-        socket(),
+        shared(this),
         fd,
         offset,
         size));

http://git-wip-us.apache.org/repos/asf/mesos/blob/7aa68496/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 7f93168..8f372aa 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -143,7 +143,7 @@ Try<Address> Socket::Impl::bind(const Address& address)
 
 
 static Future<string> _recv(
-    Socket socket,
+    const std::shared_ptr<Socket::Impl>& impl,
     const Option<ssize_t>& size,
     Owned<string> buffer,
     size_t chunk,
@@ -165,9 +165,9 @@ static Future<string> _recv(
     // We've been asked to receive until EOF so keep receiving since
     // according to the 'length == 0' check above we haven't reached
     // EOF yet.
-    return socket.recv(data.get(), chunk)
+    return impl->recv(data.get(), chunk)
       .then(lambda::bind(&_recv,
-                         socket,
+                         impl,
                          size,
                          buffer,
                          chunk,
@@ -176,9 +176,9 @@ static Future<string> _recv(
   } else if (static_cast<string::size_type>(size.get()) > buffer->size()) {
     // We've been asked to receive a particular amount of data and we
     // haven't yet received that much data so keep receiving.
-    return socket.recv(data.get(), size.get() - buffer->size())
+    return impl->recv(data.get(), size.get() - buffer->size())
       .then(lambda::bind(&_recv,
-                         socket,
+                         impl,
                          size,
                          buffer,
                          chunk,
@@ -206,7 +206,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
 
   return recv(data.get(), chunk)
     .then(lambda::bind(&_recv,
-                       socket(),
+                       shared_from_this(),
                        size,
                        buffer,
                        chunk,
@@ -216,7 +216,7 @@ Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
 
 
 static Future<Nothing> _send(
-    Socket socket,
+    const std::shared_ptr<Socket::Impl>& impl,
     Owned<string> data,
     size_t index,
     size_t length)
@@ -230,8 +230,8 @@ static Future<Nothing> _send(
   }
 
   // Keep sending!
-  return socket.send(data->data() + index, data->size() - index)
-    .then(lambda::bind(&_send, socket, data, index, lambda::_1));
+  return impl->send(data->data() + index, data->size() - index)
+    .then(lambda::bind(&_send, impl, data, index, lambda::_1));
 }
 
 
@@ -240,7 +240,7 @@ Future<Nothing> Socket::Impl::send(const string& _data)
   Owned<string> data(new string(_data));
 
   return send(data->data(), data->size())
-    .then(lambda::bind(&_send, socket(), data, 0, lambda::_1));
+    .then(lambda::bind(&_send, shared_from_this(), data, 0, lambda::_1));
 }
 
 


[08/14] mesos git commit: Refactor necessary after removing default Address constructor.

Posted by be...@apache.org.
Refactor necessary after removing default Address constructor.

Review: https://reviews.apache.org/r/54173


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46942677
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46942677
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46942677

Branch: refs/heads/master
Commit: 46942677e388d3a9835bbb1ddaabadf4b6e92ccd
Parents: 907ef80
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Nov 29 11:23:45 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Nov 29 12:06:29 2016 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 58 +++++++++++++++++---------------
 1 file changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/46942677/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 8204327..200cefc 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -442,24 +442,24 @@ private:
   set<int> dispose;
 
   // Map from socket to socket address for outbound sockets.
-  map<int, Address> addresses;
+  hashmap<int, Address> addresses;
 
   // Map from socket address to temporary sockets (outbound sockets
   // that will be closed once there is no more data to send on them).
-  map<Address, int> temps;
+  hashmap<Address, int> temps;
 
   // Map from socket address (ip, port) to persistent sockets
   // (outbound sockets that will remain open even if there is no more
   // data to send on them).  We distinguish these from the 'temps'
   // collection so we can tell when a persistent socket has been lost
   // (and thus generate ExitedEvents).
-  map<Address, int> persists;
+  hashmap<Address, int> persists;
 
   // Map from outbound socket to outgoing queue.
-  map<int, queue<Encoder*>> outgoing;
+  hashmap<int, queue<Encoder*>> outgoing;
 
   // HTTP proxies.
-  map<int, HttpProxy*> proxies;
+  hashmap<int, HttpProxy*> proxies;
 
   // Protects instance variables.
   std::recursive_mutex mutex;
@@ -1822,9 +1822,9 @@ void SocketManager::link(
         CHECK(sockets.count(s) == 0);
         sockets.emplace(s, socket.get());
 
-        addresses[s] = to.address;
+        addresses.emplace(s, to.address);
 
-        persists[to.address] = s;
+        persists.emplace(to.address, s);
 
         // Initialize 'outgoing' to prevent a race with
         // SocketManager::send() while the socket is not yet connected.
@@ -2220,8 +2220,8 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
       CHECK(sockets.count(s) == 0);
       sockets.emplace(s, socket.get());
 
-      addresses[s] = address;
-      temps[address] = s;
+      addresses.emplace(s, address);
+      temps.emplace(address, s);
 
       dispose.insert(s);
 
@@ -2285,10 +2285,10 @@ Encoder* SocketManager::next(int s)
           // This is either a temporary socket we created or it's a
           // socket that we were receiving data from and possibly
           // sending HTTP responses back on. Clean up either way.
-          if (addresses.count(s) > 0) {
-            const Address& address = addresses[s];
-            CHECK(temps.count(address) > 0 && temps[address] == s);
-            temps.erase(address);
+          Option<Address> address = addresses.get(s);
+          if (address.isSome()) {
+            CHECK(temps.count(address.get()) > 0 && temps[address.get()] == s);
+            temps.erase(address.get());
             addresses.erase(s);
           }
 
@@ -2356,15 +2356,15 @@ void SocketManager::close(int s)
       }
 
       // Clean up after sockets used for remote communication.
-      if (addresses.count(s) > 0) {
-        const Address& address = addresses[s];
-
+      Option<Address> address = addresses.get(s);
+      if (address.isSome()) {
         // Don't bother invoking `exited` unless socket was persistent.
-        if (persists.count(address) > 0 && persists[address] == s) {
-          persists.erase(address);
-          exited(address); // Generate ExitedEvent(s)!
-        } else if (temps.count(address) > 0 && temps[address] == s) {
-          temps.erase(address);
+        if (persists.count(address.get()) > 0 && persists[address.get()] == s) {
+          persists.erase(address.get());
+          exited(address.get()); // Generate ExitedEvent(s)!
+        } else if (temps.count(address.get()) > 0 &&
+                   temps[address.get()] == s) {
+          temps.erase(address.get());
         }
 
         addresses.erase(s);
@@ -2551,18 +2551,20 @@ void SocketManager::swap_implementing_socket(
     // Update the fd that this address is associated with. Once we've
     // done this we can update the 'temps' and 'persists'
     // data structures using this updated address.
-    addresses[to_fd] = addresses[from_fd];
+    Option<Address> address = addresses.get(from_fd);
+    CHECK_SOME(address);
+    addresses.emplace(to_fd, address.get());
     addresses.erase(from_fd);
 
     // If this address is a persistent or temporary link
     // that matches the original FD.
-    if (persists.count(addresses[to_fd]) > 0 &&
-        persists.at(addresses[to_fd]) == from_fd) {
-      persists[addresses[to_fd]] = to_fd;
+    if (persists.count(address.get()) > 0 &&
+        persists.at(address.get()) == from_fd) {
+      persists[address.get()] = to_fd;
       // No need to erase as we're changing the value, not the key.
-    } else if (temps.count(addresses[to_fd]) > 0 &&
-        temps.at(addresses[to_fd]) == from_fd) {
-      temps[addresses[to_fd]] = to_fd;
+    } else if (temps.count(address.get()) > 0 &&
+               temps.at(address.get()) == from_fd) {
+      temps[address.get()] = to_fd;
       // No need to erase as we're changing the value, not the key.
     }