You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:06:03 UTC
[12/12] mesos git commit: Move poll based socket implementation into
poll_socket.cpp.
Move poll based socket implementation into poll_socket.cpp.
Review: https://reviews.apache.org/r/28467
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62ad3f27
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62ad3f27
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62ad3f27
Branch: refs/heads/master
Commit: 62ad3f27c44dfbad8cb68f952ed2d076e349fd6a
Parents: f4a4180
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:44:46 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/poll_socket.cpp | 216 +++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 203 -------------------------
3 files changed, 217 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 09fce46..f401232 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES = \
src/latch.cpp \
src/metrics/metrics.cpp \
src/pid.cpp \
+ src/poll_socket.cpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
new file mode 100644
index 0000000..5202750
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -0,0 +1,216 @@
+#include <netinet/tcp.h>
+
+#include <process/io.hpp>
+#include <process/socket.hpp>
+
+#include "config.hpp"
+
+using std::string;
+
+namespace process {
+namespace network {
+
+namespace internal {
+
+Future<Nothing> connect(const Socket& socket)
+{
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+ int s = socket.get();
+
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ // Connect failure.
+ VLOG(1) << "Socket error while connecting";
+ return Failure("Socket error while connecting");
+ }
+
+ return Nothing();
+}
+
+} // namespace internal {
+
+
+Future<Nothing> Socket::Impl::connect(const Node& node)
+{
+ Try<int> connect = network::connect(get(), node);
+ if (connect.isError()) {
+ if (errno == EINPROGRESS) {
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+ }
+
+ return Failure(connect.error());
+ }
+
+ return Nothing();
+}
+
+
+Future<size_t> Socket::Impl::recv(char* data, size_t size)
+{
+ return io::read(get(), data, size);
+}
+
+
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+ CHECK(size > 0);
+
+ while (true) {
+ ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ return io::poll(s, io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, s, data, size));
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ if (length == 0) {
+ return length;
+ } else {
+ return Failure(ErrnoError("Socket send failed"));
+ }
+ } else {
+ CHECK(length > 0);
+
+ return length;
+ }
+ }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+ CHECK(size > 0);
+
+ while (true) {
+ ssize_t length = os::sendfile(s, fd, offset, size);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ return io::poll(s, io::WRITE)
+ .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ if (length == 0) {
+ return length;
+ } else {
+ return Failure(ErrnoError("Socket sendfile failed"));
+ }
+ } else {
+ CHECK(length > 0);
+
+ return length;
+ }
+ }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+ Try<int> bind = network::bind(get(), node);
+ if (bind.isError()) {
+ return Error(bind.error());
+ }
+
+ // Lookup and store assigned ip and assigned port.
+ return network::getsockname(get(), AF_INET);
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+ if (::listen(get(), backlog) < 0) {
+ return ErrnoError();
+ }
+ return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+ Try<int> accepted = network::accept(fd, AF_INET);
+ if (accepted.isError()) {
+ return Failure(accepted.error());
+ }
+
+ int s = accepted.get();
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+
+ return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+ return io::poll(get(), io::READ)
+ .then(lambda::bind(&internal::accept, get()));
+}
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 441ce48..72c0ad4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1220,209 +1220,6 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
}
-namespace internal {
-
-Future<Nothing> connect(const Socket& socket)
-{
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
- int s = socket.get();
-
- if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
- // Connect failure.
- VLOG(1) << "Socket error while connecting";
- return Failure("Socket error while connecting");
- }
-
- return Nothing();
-}
-
-} // namespace internal {
-
-
-Future<Nothing> Socket::Impl::connect(const Node& node)
-{
- Try<int> connect = network::connect(get(), node);
- if (connect.isError()) {
- if (errno == EINPROGRESS) {
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
- }
-
- return Failure(connect.error());
- }
-
- return Nothing();
-}
-
-
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
-{
- return io::read(get(), data, size);
-}
-
-
-namespace internal {
-
-Future<size_t> socket_send_data(int s, const char* data, size_t size)
-{
- CHECK(size > 0);
-
- while (true) {
- ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- return io::poll(s, io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, s, data, size));
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- if (length == 0) {
- return length;
- } else {
- return Failure(ErrnoError("Socket send failed"));
- }
- } else {
- CHECK(length > 0);
-
- return length;
- }
- }
-}
-
-
-Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
-{
- CHECK(size > 0);
-
- while (true) {
- ssize_t length = os::sendfile(s, fd, offset, size);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- return io::poll(s, io::WRITE)
- .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- if (length == 0) {
- return length;
- } else {
- return Failure(ErrnoError("Socket sendfile failed"));
- }
- } else {
- CHECK(length > 0);
-
- return length;
- }
- }
-}
-
-} // namespace internal {
-
-
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
-{
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, get(), data, size));
-}
-
-
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
-{
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
-}
-
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
- Try<int> bind = network::bind(get(), node);
- if (bind.isError()) {
- return Error(bind.error());
- }
-
- // Lookup and store assigned ip and assigned port.
- return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
- if (::listen(get(), backlog) < 0) {
- return ErrnoError();
- }
- return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
- Try<int> accepted = network::accept(fd, AF_INET);
- if (accepted.isError()) {
- return Failure(accepted.error());
- }
-
- int s = accepted.get();
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return Failure("Failed to accept, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return Failure("Failed to accept, cloexec: " + cloexec.error());
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- return Failure(
- "Failed to turn off the Nagle algorithm: " + stringify(error));
- }
-
- return Socket(s);
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
- return io::poll(get(), io::READ)
- .then(lambda::bind(&internal::accept, get()));
-}
-
-
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;