You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/02/08 16:51:28 UTC
[01/20] mesos git commit: Added std::string overloads of
Socket::recv/send.
Repository: mesos
Updated Branches:
refs/heads/master d4dd71690 -> 3c8c382e5
Added std::string overloads of Socket::recv/send.
Review: https://reviews.apache.org/r/29528
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46964382
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46964382
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46964382
Branch: refs/heads/master
Commit: 46964382d05e79f1578f34804b90ad26ee81165f
Parents: 544411a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Dec 26 09:34:31 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:45 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 35 ++++++
3rdparty/libprocess/src/socket.cpp | 113 ++++++++++++++++++++
2 files changed, 148 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/46964382/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7e92cc8..06161f2 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -68,6 +68,31 @@ public:
virtual Future<size_t> send(const char* data, size_t size) = 0;
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
+ // An overload of 'recv', receives data based on the specified
+ // 'size' parameter:
+ //
+ // Value of 'size' | Semantics
+ // --------------------|-----------------
+ // 0 | Returns an empty string.
+ // -1 | Receives until EOF.
+ // N | Returns a string of size N.
+ // 'None' | Returns a string of the available data.
+ //
+ // That is, if 'None' is specified than whenever data becomes
+ // available on the socket that much data will be returned.
+ //
+ // TODO(benh): Consider returning Owned<std::string> or
+ // Shared<std::string>, the latter enabling reuse of a pool of
+ // preallocated strings/buffers.
+ virtual Future<std::string> recv(const Option<ssize_t>& size = None());
+
+ // An overload of 'send', sends all of the specified data unless
+ // sending fails in which case a failure is returned.
+ //
+ // TODO(benh): Consider taking Shared<std::string>, the latter
+ // enabling reuse of a pool of preallocated strings/buffers.
+ virtual Future<Nothing> send(const std::string& data);
+
protected:
explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
@@ -127,6 +152,16 @@ public:
return impl->sendfile(fd, offset, size);
}
+ Future<std::string> recv(const Option<ssize_t>& size)
+ {
+ return impl->recv(size);
+ }
+
+ Future<Nothing> send(const std::string& data)
+ {
+ return impl->send(data);
+ }
+
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/46964382/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index df68077..fb5a5ab 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -1,8 +1,18 @@
+#include <unistd.h> // For sysconf.
+
+#include <memory>
+#include <string>
+
+#include <boost/shared_array.hpp>
+
#include <process/network.hpp>
+#include <process/owned.hpp>
#include <process/socket.hpp>
#include "poll_socket.hpp"
+using std::string;
+
namespace process {
namespace network {
@@ -72,5 +82,108 @@ Try<Node> Socket::Impl::bind(const Node& node)
return network::getsockname(get(), AF_INET);
}
+
+static Future<string> _recv(
+ Socket socket,
+ const Option<ssize_t>& size,
+ Owned<string> buffer,
+ size_t chunk,
+ boost::shared_array<char> data,
+ size_t length)
+{
+ if (length == 0) { // EOF.
+ // Return everything we've received thus far, a subsequent receive
+ // will return an empty string.
+ return string(*buffer);
+ }
+
+ buffer->append(data.get(), length);
+
+ if (size.isNone()) {
+ // We've been asked just to return any data that we receive!
+ return string(*buffer);
+ } else if (size.get() < 0) {
+ // We've been asked to receive until EOF so keep receiving since
+ // according to the 'length == 0' check above we haven't reached
+ // EOF yet.
+ return socket.recv(data.get(), chunk)
+ .then(lambda::bind(&_recv,
+ socket,
+ size,
+ buffer,
+ chunk,
+ data,
+ lambda::_1));
+ } else if (size.get() > buffer->size()) {
+ // We've been asked to receive a particular amount of data and we
+ // haven't yet received that much data so keep receiving.
+ return socket.recv(data.get(), size.get() - buffer->size())
+ .then(lambda::bind(&_recv,
+ socket,
+ size,
+ buffer,
+ chunk,
+ data,
+ lambda::_1));
+ }
+
+ // We've received as much data as requested, so return that data!
+ return string(*buffer);
+}
+
+
+Future<string> Socket::Impl::recv(const Option<ssize_t>& size)
+{
+ // Default chunk size to attempt to receive when nothing is
+ // specified represents roughly 16 pages.
+ static const size_t DEFAULT_CHUNK = 16 * sysconf(_SC_PAGESIZE);
+
+ size_t chunk = (size.isNone() || size.get() < 0)
+ ? DEFAULT_CHUNK
+ : size.get();
+
+ Owned<string> buffer(new string());
+ boost::shared_array<char> data(new char[chunk]);
+
+ return recv(data.get(), chunk)
+ .then(lambda::bind(&_recv,
+ socket(),
+ size,
+ buffer,
+ chunk,
+ data,
+ lambda::_1));
+}
+
+
+static Future<Nothing> _send(
+ Socket socket,
+ Owned<string> data,
+ size_t index,
+ size_t length)
+{
+ // Increment the index into the data.
+ index += length;
+
+ // Check if we've sent all of the data.
+ if (index == data->size()) {
+ return Nothing();
+ }
+
+ // Keep sending!
+ return socket.send(data->data() + index, data->size() - index)
+ .then(lambda::bind(&_send, socket, data, index, lambda::_1));
+}
+
+
+Future<Nothing> Socket::Impl::send(const std::string& _data)
+{
+ Owned<string> data(new string(_data));
+
+ return send(data->data(), data->size())
+ .then(lambda::bind(&_send, socket(), data, 0, lambda::_1));
+}
+
+
} // namespace network {
} // namespace process {
[02/20] mesos git commit: Moved network::* functions into network.hpp.
Posted by be...@apache.org.
Moved network::* functions into network.hpp.
Review: https://reviews.apache.org/r/29527
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/544411a1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/544411a1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/544411a1
Branch: refs/heads/master
Commit: 544411a11b39799037e2c87858d426ac70a71794
Parents: 8b48850
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Dec 25 17:10:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:45 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/Makefile.am | 1 +
3rdparty/libprocess/include/process/network.hpp | 102 +++++++++++++++++++
3rdparty/libprocess/include/process/socket.hpp | 87 ----------------
3rdparty/libprocess/src/http.cpp | 1 +
3rdparty/libprocess/src/poll_socket.cpp | 1 +
3rdparty/libprocess/src/socket.cpp | 1 +
3rdparty/libprocess/src/tests/process_tests.cpp | 1 +
7 files changed, 107 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index 542ae1c..988c8d5 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -31,6 +31,7 @@ nobase_include_HEADERS = \
process/metrics/timer.hpp \
process/mime.hpp \
process/mutex.hpp \
+ process/network.hpp \
process/node.hpp \
process/once.hpp \
process/owned.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/include/process/network.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/network.hpp b/3rdparty/libprocess/include/process/network.hpp
new file mode 100644
index 0000000..2ceea8c
--- /dev/null
+++ b/3rdparty/libprocess/include/process/network.hpp
@@ -0,0 +1,102 @@
+#ifndef __PROCESS_NETWORK_HPP__
+#define __PROCESS_NETWORK_HPP__
+
+#include <process/node.hpp>
+
+#include <stout/net.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+
+// Returns a socket file descriptor for the specified options. Note
+// that on OS X, the returned socket will have the SO_NOSIGPIPE option
+// set.
+inline Try<int> socket(int family, int type, int protocol)
+{
+ int s;
+ if ((s = ::socket(family, type, protocol)) == -1) {
+ return ErrnoError();
+ }
+
+#ifdef __APPLE__
+ // Disable SIGPIPE via setsockopt because OS X does not support
+ // the MSG_NOSIGNAL flag on send(2).
+ const int enable = 1;
+ if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(int)) == -1) {
+ return ErrnoError();
+ }
+#endif // __APPLE__
+
+ return s;
+}
+
+// accept, bind, connect, getsockname wrappers for different protocol families
+inline Try<int> accept(int s, sa_family_t family)
+{
+ switch (family) {
+ case AF_INET: {
+ sockaddr_in addr = net::createSockaddrIn(0, 0);
+ socklen_t addrlen = sizeof(addr);
+
+ int accepted = ::accept(s, (sockaddr*) &addr, &addrlen);
+ if (accepted < 0) {
+ return ErrnoError("Failed to accept");
+ }
+
+ return accepted;
+ }
+ default:
+ return Error("Unsupported family type: " + stringify(family));
+ }
+}
+
+
+inline Try<int> bind(int s, const Node& node)
+{
+ sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
+
+ int error = ::bind(s, (sockaddr*) &addr, sizeof(addr));
+ if (error < 0) {
+ return ErrnoError("Failed to bind on " + stringify(node));
+ }
+
+ return error;
+}
+
+
+inline Try<int> connect(int s, const Node& node)
+{
+ sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
+
+ int error = ::connect(s, (sockaddr*) &addr, sizeof(addr));
+ if (error < 0) {
+ return ErrnoError("Failed to connect to " + stringify(node));
+ }
+
+ return error;
+}
+
+
+inline Try<Node> getsockname(int s, sa_family_t family)
+{
+ switch (family) {
+ case AF_INET: {
+ sockaddr_in addr = net::createSockaddrIn(0, 0);
+ socklen_t addrlen = sizeof(addr);
+
+ if(::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) {
+ return ErrnoError("Failed to getsockname");
+ }
+
+ return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+ }
+ default:
+ return Error("Unsupported family type: " + stringify(family));
+ }
+}
+
+} // namespace network {
+} // namespace process {
+
+#endif // __PROCESS_NETWORK_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index ddb9e36..7e92cc8 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -1,8 +1,6 @@
#ifndef __PROCESS_SOCKET_HPP__
#define __PROCESS_SOCKET_HPP__
-#include <assert.h>
-
#include <memory>
#include <process/future.hpp>
@@ -14,98 +12,13 @@
#include <stout/os.hpp>
#include <stout/try.hpp>
-
namespace process {
namespace network {
-// Returns a socket fd for the specified options. Note that on OS X,
-// the returned socket will have the SO_NOSIGPIPE option set.
-inline Try<int> socket(int family, int type, int protocol)
-{
- int s;
- if ((s = ::socket(family, type, protocol)) == -1) {
- return ErrnoError();
- }
-
-#ifdef __APPLE__
- // Disable SIGPIPE via setsockopt because OS X does not support
- // the MSG_NOSIGNAL flag on send(2).
- const int enable = 1;
- if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(int)) == -1) {
- return ErrnoError();
- }
-#endif // __APPLE__
-
- return s;
-}
-
-// accept, bind, connect, getsockname wrappers for different protocol families
-inline Try<int> accept(int s, sa_family_t family)
-{
- switch (family) {
- case AF_INET: {
- sockaddr_in addr = net::createSockaddrIn(0, 0);
- socklen_t addrlen = sizeof(addr);
-
- int accepted = ::accept(s, (sockaddr*) &addr, &addrlen);
- if (accepted < 0) {
- return ErrnoError("Failed to accept");
- }
-
- return accepted;
- }
- default:
- return Error("Unsupported family type: " + stringify(family));
- }
-}
-
-inline Try<int> bind(int s, const Node& node)
-{
- sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
-
- int error = ::bind(s, (sockaddr*) &addr, sizeof(addr));
- if (error < 0) {
- return ErrnoError("Failed to bind on " + stringify(node));
- }
-
- return error;
-}
-
-inline Try<int> connect(int s, const Node& node)
-{
- sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
-
- int error = ::connect(s, (sockaddr*) &addr, sizeof(addr));
- if (error < 0) {
- return ErrnoError("Failed to connect to " + stringify(node));
- }
-
- return error;
-}
-
-inline Try<Node> getsockname(int s, sa_family_t family)
-{
- switch (family) {
- case AF_INET: {
- sockaddr_in addr = net::createSockaddrIn(0, 0);
- socklen_t addrlen = sizeof(addr);
-
- if(::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) {
- return ErrnoError("Failed to getsockname");
- }
-
- return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
- }
- default:
- return Error("Unsupported family type: " + stringify(family));
- }
-}
-
// An abstraction around a socket (file descriptor) that provides
// reference counting such that the socket is only closed (and thus,
// has the possiblity of being reused) after there are no more
// references.
-
class Socket
{
public:
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index ec9823e..5063014 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -10,6 +10,7 @@
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/io.hpp>
+#include <process/network.hpp>
#include <process/socket.hpp>
#include <stout/lambda.hpp>
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 2e70c6c..eaeabd7 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -1,6 +1,7 @@
#include <netinet/tcp.h>
#include <process/io.hpp>
+#include <process/network.hpp>
#include <process/socket.hpp>
#include "config.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 4b0f6be..df68077 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -1,3 +1,4 @@
+#include <process/network.hpp>
#include <process/socket.hpp>
#include "poll_socket.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/544411a1/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index a4bf28c..5580546 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -20,6 +20,7 @@
#include <process/gc.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <process/network.hpp>
#include <process/process.hpp>
#include <process/run.hpp>
#include <process/socket.hpp>
[17/20] mesos git commit: Added an Address::family() function.
Posted by be...@apache.org.
Added an Address::family() function.
Review: https://reviews.apache.org/r/29540
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4065fa69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4065fa69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4065fa69
Branch: refs/heads/master
Commit: 4065fa6926e10b541a73e380da027942bdc8cd13
Parents: 4a93512
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 1 20:01:25 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:46:44 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/address.hpp | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4065fa69/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 7db6271..f5fb4dd 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -17,6 +17,8 @@ namespace network {
// Represents a network "address", subsuming the struct addrinfo and
// struct sockaddr* that typically is used to encapsulate IP and port.
+//
+// TODO(benh): Create a Family enumeration to replace sa_family_t.
class Address
{
public:
@@ -52,6 +54,11 @@ public:
return !(*this == that);
}
+ sa_family_t family() const
+ {
+ return AF_INET;
+ }
+
uint32_t ip;
uint16_t port;
};
[20/20] mesos git commit: Fixed a flaky test.
Posted by be...@apache.org.
Fixed a flaky test.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c8c382e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c8c382e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c8c382e
Branch: refs/heads/master
Commit: 3c8c382e514a9486f73d8254f9d989a2fd85f2d0
Parents: 9f9afca
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Feb 7 15:37:40 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Feb 8 07:51:05 2015 -0800
----------------------------------------------------------------------
src/tests/authentication_tests.cpp | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3c8c382e/src/tests/authentication_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authentication_tests.cpp b/src/tests/authentication_tests.cpp
index c594f3f..48f73e2 100644
--- a/src/tests/authentication_tests.cpp
+++ b/src/tests/authentication_tests.cpp
@@ -750,6 +750,7 @@ TEST_F(AuthenticationTest, SchedulerFailover)
driver2.start();
AWAIT_READY(sched2Registered);
+ AWAIT_READY(sched1Error);
driver2.stop();
driver2.join();
[11/20] mesos git commit: Used new std::string Socket::recv/send
overloads in tests.
Posted by be...@apache.org.
Used new std::string Socket::recv/send overloads in tests.
Review: https://reviews.apache.org/r/29530
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16dc6c55
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16dc6c55
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16dc6c55
Branch: refs/heads/master
Commit: 16dc6c55b0c224a741fa5597a589f1c6d29a19f1
Parents: 5eba5e2
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Dec 26 09:38:50 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/tests/http_tests.cpp | 7 ++-----
3rdparty/libprocess/src/tests/process_tests.cpp | 7 ++-----
2 files changed, 4 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/16dc6c55/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 83ca10e..ec64e60 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -133,14 +133,11 @@ TEST(HTTP, Endpoints)
EXPECT_CALL(process, body(_))
.WillOnce(Return(http::OK()));
- AWAIT_EXPECT_EQ(data.size(), socket.send(data.data(), data.size()));
+ AWAIT_READY(socket.send(data));
string response = "HTTP/1.1 200 OK";
- char temp[response.size()];
-
- AWAIT_EXPECT_EQ(response.size(), socket.recv(temp, response.size()));
- ASSERT_EQ(response, string(temp, response.size()));
+ AWAIT_EXPECT_EQ(response, socket.recv(response.size()));
// Now hit '/pipe' (by using http::get).
int pipes[2];
http://git-wip-us.apache.org/repos/asf/mesos/blob/16dc6c55/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 5580546..fe758ab 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1435,7 +1435,7 @@ TEST(Process, remote)
const string& data = MessageEncoder::encode(&message);
- AWAIT_EXPECT_EQ(data.size(), socket.send(data.data(), data.size()));
+ AWAIT_READY(socket.send(data));
AWAIT_READY(handler);
@@ -1533,10 +1533,7 @@ TEST(Process, http2)
const string data = "POST /" + name + " HTTP/1.1";
- char temp[data.size()];
-
- AWAIT_ASSERT_EQ(data.size(), client.recv(temp, data.size()));
- ASSERT_EQ(data, string(temp, data.size()));
+ AWAIT_EXPECT_EQ(data, client.recv(data.size()));
terminate(process);
wait(process);
[12/20] mesos git commit: Fix weird outputing when running configure.
Posted by be...@apache.org.
Fix weird outputing when running configure.
Review: https://reviews.apache.org/r/29531
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/64e509b8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/64e509b8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/64e509b8
Branch: refs/heads/master
Commit: 64e509b896b2f4cdc73e1d4cc76b2a2698e99cfe
Parents: 16dc6c5
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Dec 16 13:13:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/configure.ac | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/64e509b8/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index 19bc601..a126ecf 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -703,7 +703,7 @@ esac
# Also pass the flags to 3rdparty libraries.
CONFIGURE_ARGS="$CONFIGURE_ARGS CXXFLAGS='$CXXFLAGS'"
-AC_CHECK_LIB([dl], [dlopen], [AC_MSG_RESULT([found])],
+AC_CHECK_LIB([dl], [dlopen], [],
[AC_MSG_ERROR([cannot find libdl
-------------------------------------------------------------------
libdl was not found, and is required for compilation.
[05/20] mesos git commit: Update Mesos for HTTP query string
encode/decode.
Posted by be...@apache.org.
Update Mesos for HTTP query string encode/decode.
Review: https://reviews.apache.org/r/30442
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e93672be
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e93672be
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e93672be
Branch: refs/heads/master
Commit: e93672bef01f124f60fbce4699efb6c2112703b0
Parents: 93bed98
Author: Cody Maloney <co...@mesosphere.io>
Authored: Fri Jan 30 18:02:23 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:45 2015 -0800
----------------------------------------------------------------------
src/master/http.cpp | 20 ++++++++++++++++----
src/tests/repair_tests.cpp | 4 +++-
2 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e93672be/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 138624d..6d7b9f6 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -278,8 +278,14 @@ Future<Response> Master::Http::observe(const Request& request)
{
LOG(INFO) << "HTTP request for '" << request.path << "'";
- hashmap<string, string> values =
- process::http::query::parse(request.body);
+ Try<hashmap<string, string>> decode =
+ process::http::query::decode(request.body);
+
+ if (decode.isError()) {
+ return BadRequest("Unable to decode query string: " + decode.error());
+ }
+
+ hashmap<string, string> values = decode.get();
// Build up a JSON object of the values we recieved and send them back
// down the wire as JSON for validation / confirmation.
@@ -655,8 +661,14 @@ Future<Response> Master::Http::shutdown(const Request& request)
// Parse the query string in the request body (since this is a POST)
// in order to determine the framework ID to shutdown.
- hashmap<string, string> values =
- process::http::query::parse(request.body);
+ Try<hashmap<string, string>> decode =
+ process::http::query::decode(request.body);
+
+ if (decode.isError()) {
+ return BadRequest("Unable to decode query string: " + decode.error());
+ }
+
+ hashmap<string, string> values = decode.get();
if (values.get("frameworkId").isNone()) {
return BadRequest("Missing 'frameworkId' query parameter");
http://git-wip-us.apache.org/repos/asf/mesos/blob/e93672be/src/tests/repair_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/repair_tests.cpp b/src/tests/repair_tests.cpp
index 734d3b7..1aad5c1 100644
--- a/src/tests/repair_tests.cpp
+++ b/src/tests/repair_tests.cpp
@@ -114,7 +114,9 @@ TEST_F(HealthTest, ObserveEndpoint)
"observe",
None(),
"monitor=foo%");
- VALIDATE_BAD_RESPONSE(response, "Malformed % escape in 'foo%': '%'");
+ VALIDATE_BAD_RESPONSE(
+ response,
+ "Unable to decode query string: Malformed % escape in 'foo%': '%'");
// Empty value causes error.
response = process::http::post(
[09/20] mesos git commit: Used network::Socket instead of
network::socket for http::get/post.
Posted by be...@apache.org.
Used network::Socket instead of network::socket for http::get/post.
Also updated the implementation of http::get/post to be completely
asynchronous. This refactor was made significantly easier thanks to
the new std::string Socket::recv/send overloads.
Review: https://reviews.apache.org/r/29529
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5eba5e2d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5eba5e2d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5eba5e2d
Branch: refs/heads/master
Commit: 5eba5e2d4c4d66186eb30c82e24f508360e53ba7
Parents: 4696438
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Dec 26 09:35:27 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 16 ++---
3rdparty/libprocess/src/http.cpp | 86 ++++++++++++++---------
2 files changed, 58 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eba5e2d/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 1d1ad41..689f56d 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -493,25 +493,21 @@ inline Try<std::string> decode(const std::string& s)
}
-// Sends a blocking HTTP GET request to the process with the given upid.
-// Returns the HTTP response from the process, read asynchronously.
-//
-// TODO(bmahler): Have the request sent asynchronously as well.
-// TODO(bmahler): For efficiency, this should properly use the ResponseDecoder
-// on the read stream, rather than parsing the full string response at the end.
+// Asynchronously sends an HTTP GET request to the process with the
+// given UPID and returns the HTTP response from the process.
Future<Response> get(
const UPID& upid,
const Option<std::string>& path = None(),
const Option<std::string>& query = None(),
- const Option<hashmap<std::string, std::string> >& headers = None());
+ const Option<hashmap<std::string, std::string>>& headers = None());
-// Sends a blocking HTTP POST request to the process with the given upid.
-// Returns the HTTP response from the process, read asyncronously.
+// Asynchronously sends an HTTP POST request to the process with the
+// given UPID and returns the HTTP response from the process.
Future<Response> post(
const UPID& upid,
const Option<std::string>& path = None(),
- const Option<hashmap<std::string, std::string> >& headers = None(),
+ const Option<hashmap<std::string, std::string>>& headers = None(),
const Option<std::string>& body = None(),
const Option<std::string>& contentType = None());
http://git-wip-us.apache.org/repos/asf/mesos/blob/5eba5e2d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 5063014..30e825d 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -9,14 +9,12 @@
#include <process/future.hpp>
#include <process/http.hpp>
-#include <process/io.hpp>
-#include <process/network.hpp>
+#include <process/owned.hpp>
#include <process/socket.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
-#include <stout/os.hpp>
#include <stout/try.hpp>
#include "decoder.hpp"
@@ -27,6 +25,8 @@ using std::string;
using process::http::Request;
using process::http::Response;
+using process::network::Socket;
+
namespace process {
namespace http {
@@ -108,37 +108,58 @@ Future<Response> decode(const string& buffer)
}
-Future<Response> request(
+// Forward declaration.
+Future<Response> _request(
+ Socket socket,
const UPID& upid,
const string& method,
const Option<string>& path,
const Option<string>& query,
const Option<hashmap<string, string> >& _headers,
const Option<string>& body,
- const Option<string>& contentType)
-{
- Try<int> socket = network::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+ const Option<string>& contentType);
- if (socket.isError()) {
- return Failure("Failed to create socket: " + socket.error());
- }
- int s = socket.get();
+Future<Response> request(
+ const UPID& upid,
+ const string& method,
+ const Option<string>& path,
+ const Option<string>& query,
+ const Option<hashmap<string, string> >& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ Try<Socket> create = Socket::create();
- Try<Nothing> cloexec = os::cloexec(s);
- if (!cloexec.isSome()) {
- os::close(s);
- return Failure("Failed to cloexec: " + cloexec.error());
+ if (create.isError()) {
+ return Failure("Failed to create socket: " + create.error());
}
- const string host = stringify(upid.node);
+ Socket socket = create.get();
+
+ return socket.connect(upid.node)
+ .then(lambda::bind(&_request,
+ socket,
+ upid,
+ method,
+ path,
+ query,
+ headers,
+ body,
+ contentType));
+}
- Try<int> connect = network::connect(s, upid.node);
- if (connect.isError()) {
- os::close(s);
- return Failure(connect.error());
- }
+Future<Response> _request(
+ Socket socket,
+ const UPID& upid,
+ const string& method,
+ const Option<string>& path,
+ const Option<string>& query,
+ const Option<hashmap<string, string> >& _headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
std::ostringstream out;
out << method << " /" << upid.id;
@@ -161,7 +182,7 @@ Future<Response> request(
}
// Need to specify the 'Host' header.
- headers["Host"] = host;
+ headers["Host"] = stringify(upid.node);
// Tell the server to close the connection when it's done.
headers["Connection"] = "close";
@@ -187,19 +208,16 @@ Future<Response> request(
out << body.get();
}
- Try<Nothing> nonblock = os::nonblock(s);
- if (!nonblock.isSome()) {
- os::close(s);
- return Failure("Failed to set nonblock: " + nonblock.error());
- }
-
- // Need to disambiguate the io::read we want when binding below.
- Future<string> (*read)(int) = io::read;
+ // Need to disambiguate the Socket::recv for binding below.
+ Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv;
- return io::write(s, out.str())
- .then(lambda::bind(read, s))
- .then(lambda::bind(&internal::decode, lambda::_1))
- .onAny(lambda::bind(&os::close, s));
+ // TODO(bmahler): For efficiency, this should properly use the
+ // ResponseDecoder when reading, rather than parsing the full string
+ // response.
+ return socket.send(out.str())
+ .then(lambda::function<Future<string>(void)>(
+ lambda::bind(recv, socket, -1)))
+ .then(lambda::bind(&internal::decode, lambda::_1));
}
[18/20] mesos git commit: Replaced Node with Address in Mesos.
Posted by be...@apache.org.
Replaced Node with Address in Mesos.
Review: https://reviews.apache.org/r/29539
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a935124
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a935124
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a935124
Branch: refs/heads/master
Commit: 4a9351243dec77f182fd34a861ec753aac170276
Parents: 257bd1a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 1 17:29:36 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:46:44 2015 -0800
----------------------------------------------------------------------
src/common/protobuf_utils.cpp | 6 +-
src/master/master.cpp | 10 +--
src/sched/sched.cpp | 2 +-
src/scheduler/scheduler.cpp | 2 +-
src/slave/http.cpp | 6 +-
src/slave/slave.cpp | 6 +-
src/tests/fetcher_tests.cpp | 4 +-
src/tests/files_tests.cpp | 8 +--
src/tests/gc_tests.cpp | 6 +-
src/tests/logging_tests.cpp | 2 +-
src/tests/master_contender_detector_tests.cpp | 73 +++++++++++++---------
src/tests/master_tests.cpp | 12 ++--
src/tests/metrics_tests.cpp | 4 +-
src/tests/monitor_tests.cpp | 2 +-
14 files changed, 78 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 345d96e..af6b785 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -141,11 +141,11 @@ MasterInfo createMasterInfo(const process::UPID& pid)
{
MasterInfo info;
info.set_id(stringify(pid) + "-" + UUID::random().toString());
- info.set_ip(pid.node.ip);
- info.set_port(pid.node.port);
+ info.set_ip(pid.address.ip);
+ info.set_port(pid.address.port);
info.set_pid(pid);
- Try<string> hostname = net::getHostname(pid.node.ip);
+ Try<string> hostname = net::getHostname(pid.address.ip);
if (hostname.isSome()) {
info.set_hostname(hostname.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index bc1f8c9..7c3aa22 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -289,20 +289,20 @@ Master::Master(
// address and port from self() and the OS PID.
Try<string> id =
strings::format("%s-%u-%u-%d", DateUtils::currentDate(),
- self().node.ip, self().node.port, getpid());
+ self().address.ip, self().address.port, getpid());
CHECK(!id.isError()) << id.error();
info_.set_id(id.get());
- info_.set_ip(self().node.ip);
- info_.set_port(self().node.port);
+ info_.set_ip(self().address.ip);
+ info_.set_port(self().address.port);
info_.set_pid(self());
// Determine our hostname or use the hostname provided.
string hostname;
if (flags.hostname.isNone()) {
- Try<string> result = net::getHostname(self().node.ip);
+ Try<string> result = net::getHostname(self().address.ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
@@ -344,7 +344,7 @@ void Master::initialize()
LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
<< " started on " << string(self()).substr(7);
- if (stringify(net::IP(ntohl(self().node.ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(self().address.ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Master bound to loopback interface!"
<< " Cannot communicate with remote schedulers or slaves."
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index ce6ff6d..1af52b2 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1239,7 +1239,7 @@ void MesosSchedulerDriver::initialize() {
// Initialize libprocess.
process::initialize(schedulerId);
- if (stringify(net::IP(ntohl(process::node().ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(process::address().ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Scheduler driver bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index f5ee1b0..de7d0bb 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -126,7 +126,7 @@ public:
// want to use flags to initialize libprocess).
process::initialize();
- if (stringify(net::IP(ntohl(self().node.ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(self().address.ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Scheduler driver bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index c9b5731..80b02d6 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -360,9 +360,9 @@ Future<Response> Slave::Http::state(const Request& request)
object.values["lost_tasks"] = slave->stats.tasks[TASK_LOST];
if (slave->master.isSome()) {
- Try<string> masterHostname = net::getHostname(slave->master.get().node.ip);
- if (masterHostname.isSome()) {
- object.values["master_hostname"] = masterHostname.get();
+ Try<string> hostname = net::getHostname(slave->master.get().address.ip);
+ if (hostname.isSome()) {
+ object.values["master_hostname"] = hostname.get();
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 336e877..fff2d72 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -171,7 +171,7 @@ void Slave::initialize()
{
LOG(INFO) << "Slave started on " << string(self()).substr(6);
- if (stringify(net::IP(ntohl(self().node.ip))) == "127.0.0.1") {
+ if (stringify(net::IP(ntohl(self().address.ip))) == "127.0.0.1") {
LOG(WARNING) << "\n**************************************************\n"
<< "Slave bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
@@ -307,7 +307,7 @@ void Slave::initialize()
string hostname;
if (flags.hostname.isNone()) {
- Try<string> result = net::getHostname(self().node.ip);
+ Try<string> result = net::getHostname(self().address.ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
@@ -320,7 +320,7 @@ void Slave::initialize()
// Initialize slave info.
info.set_hostname(hostname);
- info.set_port(self().node.port);
+ info.set_port(self().address.port);
info.mutable_resources()->CopyFrom(resources.get());
info.mutable_attributes()->CopyFrom(attributes);
info.set_checkpoint(flags.checkpoint);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_tests.cpp b/src/tests/fetcher_tests.cpp
index d7266f5..fcbf7ad 100644
--- a/src/tests/fetcher_tests.cpp
+++ b/src/tests/fetcher_tests.cpp
@@ -409,8 +409,8 @@ TEST_F(FetcherTest, OSNetUriTest)
spawn(process);
- string url = "http://" + net::getHostname(process.self().node.ip).get() +
- ":" + stringify(process.self().node.port) + "/help";
+ string url = "http://" + net::getHostname(process.self().address.ip).get() +
+ ":" + stringify(process.self().address.port) + "/help";
string localFile = path::join(os::getcwd(), "help");
EXPECT_FALSE(os::exists(localFile));
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/files_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/files_tests.cpp b/src/tests/files_tests.cpp
index 2e16665..650f1b9 100644
--- a/src/tests/files_tests.cpp
+++ b/src/tests/files_tests.cpp
@@ -84,7 +84,7 @@ TEST_F(FilesTest, DetachTest)
TEST_F(FilesTest, ReadTest)
{
Files files;
- process::UPID upid("files", process::node());
+ process::UPID upid("files", process::address());
Future<Response> response =
process::http::get(upid, "read.json");
@@ -138,7 +138,7 @@ TEST_F(FilesTest, ReadTest)
TEST_F(FilesTest, ResolveTest)
{
Files files;
- process::UPID upid("files", process::node());
+ process::UPID upid("files", process::address());
// Test the directory / file resolution.
ASSERT_SOME(os::mkdir("1/2"));
@@ -214,7 +214,7 @@ TEST_F(FilesTest, ResolveTest)
TEST_F(FilesTest, BrowseTest)
{
Files files;
- process::UPID upid("files", process::node());
+ process::UPID upid("files", process::address());
ASSERT_SOME(os::mkdir("1/2"));
ASSERT_SOME(os::mkdir("1/3"));
@@ -267,7 +267,7 @@ TEST_F(FilesTest, BrowseTest)
TEST_F(FilesTest, DownloadTest)
{
Files files;
- process::UPID upid("files", process::node());
+ process::UPID upid("files", process::address());
// This is a one-pixel black gif image.
const unsigned char gifData[] = {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/gc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index 2cf7d16..45efce3 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -459,7 +459,7 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedFramework)
ASSERT_FALSE(os::exists(frameworkDir));
- process::UPID filesUpid("files", process::node());
+ process::UPID filesUpid("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(filesUpid, "browse.json", "path=" + frameworkDir));
@@ -560,7 +560,7 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
// Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
- process::UPID files("files", process::node());
+ process::UPID files("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(files, "browse.json", "path=" + executorDir));
@@ -675,7 +675,7 @@ TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
// Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
- process::UPID files("files", process::node());
+ process::UPID files("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(files, "browse.json", "path=" + executorDir));
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/logging_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/logging_tests.cpp b/src/tests/logging_tests.cpp
index dcfbe5f..0f2d30d 100644
--- a/src/tests/logging_tests.cpp
+++ b/src/tests/logging_tests.cpp
@@ -37,7 +37,7 @@ TEST(LoggingTest, Toggle)
{
process::PID<> pid;
pid.id = "logging";
- pid.node = process::node();
+ pid.address = process::address();
process::Future<Response> response = process::http::get(pid, "toggle");
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index e94cd79..78b1a6a 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -129,8 +129,8 @@ TEST_F(MasterContenderDetectorTest, File)
TEST(BasicMasterContenderDetectorTest, Contender)
{
PID<Master> master;
- master.node.ip = 10000000;
- master.node.port = 10000;
+ master.address.ip = 10000000;
+ master.address.port = 10000;
MasterContender* contender = new StandaloneMasterContender();
@@ -154,8 +154,8 @@ TEST(BasicMasterContenderDetectorTest, Contender)
TEST(BasicMasterContenderDetectorTest, Detector)
{
PID<Master> master;
- master.node.ip = 10000000;
- master.node.port = 10000;
+ master.address.ip = 10000000;
+ master.address.port = 10000;
StandaloneMasterDetector detector;
@@ -198,8 +198,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
ZooKeeperMasterContender* contender = new ZooKeeperMasterContender(group);
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo master = mesos::protobuf::createMasterInfo(pid);
contender->initialize(master);
@@ -256,8 +257,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
ZooKeeperMasterContender contender(url.get());
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo master = mesos::protobuf::createMasterInfo(pid);
contender.initialize(master);
@@ -310,8 +312,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
new ZooKeeperMasterContender(url.get());
PID<Master> pid1;
- pid1.node.ip = 10000000;
- pid1.node.port = 10000;
+ pid1.address.ip = 10000000;
+ pid1.address.port = 10000;
+
MasterInfo master1 = mesos::protobuf::createMasterInfo(pid1);
contender1->initialize(master1);
@@ -328,8 +331,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
ZooKeeperMasterContender contender2(url.get());
PID<Master> pid2;
- pid2.node.ip = 10000001;
- pid2.node.port = 10001;
+ pid2.address.ip = 10000001;
+ pid2.address.port = 10001;
+
MasterInfo master2 = mesos::protobuf::createMasterInfo(pid2);
contender2.initialize(master2);
@@ -367,8 +371,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
AWAIT_READY(group1.join("data"));
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo master = mesos::protobuf::createMasterInfo(pid);
// group2's password is wrong and operations on it will fail.
@@ -431,8 +436,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
ZooKeeperMasterContender contender(url.get());
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo master = mesos::protobuf::createMasterInfo(pid);
contender.initialize(master);
@@ -507,8 +513,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
ZooKeeperMasterContender leaderContender(leaderGroup);
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo leader = mesos::protobuf::createMasterInfo(pid);
leaderContender.initialize(leader);
@@ -528,8 +535,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
ZooKeeperMasterContender followerContender(followerGroup);
PID<Master> pid2;
- pid2.node.ip = 10000001;
- pid2.node.port = 10001;
+ pid2.address.ip = 10000001;
+ pid2.address.port = 10001;
+
MasterInfo follower = mesos::protobuf::createMasterInfo(pid2);
followerContender.initialize(follower);
@@ -617,8 +625,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ASSERT_SOME(url);
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo leader = mesos::protobuf::createMasterInfo(pid);
// Create the group instance so we can expire its session.
@@ -646,8 +655,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
// Simulate a following master.
PID<Master> pid2;
- pid2.node.ip = 10000001;
- pid2.node.port = 10001;
+ pid2.address.ip = 10000001;
+ pid2.address.port = 10001;
+
MasterInfo follower = mesos::protobuf::createMasterInfo(pid2);
ZooKeeperMasterDetector followerDetector(url.get());
@@ -693,8 +703,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
ASSERT_SOME(url);
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo master = mesos::protobuf::createMasterInfo(pid);
ZooKeeperMasterContender masterContender(url.get());
@@ -754,8 +765,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterDetector leaderDetector(leaderGroup);
PID<Master> pid;
- pid.node.ip = 10000000;
- pid.node.port = 10000;
+ pid.address.ip = 10000000;
+ pid.address.port = 10000;
+
MasterInfo leader = mesos::protobuf::createMasterInfo(pid);
leaderContender.initialize(leader);
@@ -774,8 +786,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterDetector followerDetector(followerGroup);
PID<Master> pid2;
- pid2.node.ip = 10000001;
- pid2.node.port = 10001;
+ pid2.address.ip = 10000001;
+ pid2.address.port = 10001;
+
MasterInfo follower = mesos::protobuf::createMasterInfo(pid2);
followerContender.initialize(follower);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 62ba35b..b52d2ca 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -964,8 +964,8 @@ TEST_F(MasterTest, MasterInfo)
driver.start();
AWAIT_READY(masterInfo);
- EXPECT_EQ(master.get().node.port, masterInfo.get().port());
- EXPECT_EQ(master.get().node.ip, masterInfo.get().ip());
+ EXPECT_EQ(master.get().address.port, masterInfo.get().port());
+ EXPECT_EQ(master.get().address.ip, masterInfo.get().ip());
driver.stop();
driver.join();
@@ -1022,8 +1022,8 @@ TEST_F(MasterTest, MasterInfoOnReElection)
AWAIT_READY(disconnected);
AWAIT_READY(masterInfo);
- EXPECT_EQ(master.get().node.port, masterInfo.get().port());
- EXPECT_EQ(master.get().node.ip, masterInfo.get().ip());
+ EXPECT_EQ(master.get().address.port, masterInfo.get().port());
+ EXPECT_EQ(master.get().address.ip, masterInfo.get().ip());
// The re-registered framework should get offers.
AWAIT_READY(resourceOffers2);
@@ -2138,8 +2138,8 @@ TEST_F(MasterTest, MaxExecutorsPerSlave)
driver.start();
AWAIT_READY(masterInfo);
- EXPECT_EQ(master.get().node.port, masterInfo.get().port());
- EXPECT_EQ(master.get().node.ip, masterInfo.get().ip());
+ EXPECT_EQ(master.get().address.port, masterInfo.get().port());
+ EXPECT_EQ(master.get().address.ip, masterInfo.get().ip());
driver.stop();
driver.join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/metrics_tests.cpp b/src/tests/metrics_tests.cpp
index 751f82a..3b2dc24 100644
--- a/src/tests/metrics_tests.cpp
+++ b/src/tests/metrics_tests.cpp
@@ -41,7 +41,7 @@ TEST_F(MetricsTest, Master)
ASSERT_SOME(master);
// Get the snapshot.
- process::UPID upid("metrics", process::node());
+ process::UPID upid("metrics", process::address());
process::Future<process::http::Response> response =
process::http::get(upid, "snapshot");
@@ -158,7 +158,7 @@ TEST_F(MetricsTest, Slave)
ASSERT_SOME(slave);
// Get the snapshot.
- process::UPID upid("metrics", process::node());
+ process::UPID upid("metrics", process::address());
process::Future<process::http::Response> response =
process::http::get(upid, "snapshot");
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a935124/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 9946741..b640d89 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -218,7 +218,7 @@ TEST(MonitorTest, Statistics)
// Now wait for ResouorceMonitorProcess::watch to finish.
process::Clock::settle();
- process::UPID upid("monitor", process::node());
+ process::UPID upid("monitor", process::address());
// Request the statistics, this will ask the isolator.
Future<Response> response = process::http::get(upid, "statistics.json");
[13/20] mesos git commit: Fixed style formatting.
Posted by be...@apache.org.
Fixed style formatting.
Review: https://reviews.apache.org/r/29532
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e369699c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e369699c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e369699c
Branch: refs/heads/master
Commit: e369699ce932727690409f86cd5d9b18aa578622
Parents: 64e509b
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Dec 16 13:16:17 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/stout/include/stout/strings.hpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e369699c/3rdparty/libprocess/3rdparty/stout/include/stout/strings.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/strings.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/strings.hpp
index 7976f22..46b79bc 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/strings.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/strings.hpp
@@ -287,7 +287,7 @@ std::string join(
// Ensure std::string doesn't fall into the iterable case
-inline std::string join(const std::string &seperator, const std::string &s) {
+inline std::string join(const std::string& seperator, const std::string& s) {
return s;
}
[10/20] mesos git commit: Added Future::repair.
Posted by be...@apache.org.
Added Future::repair.
Review: https://reviews.apache.org/r/29535
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b134530f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b134530f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b134530f
Branch: refs/heads/master
Commit: b134530f43dceb7b06639260d004c63cea989261
Parents: 853bd8f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Dec 16 13:13:55 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 60 +++++++++++++++++---
3rdparty/libprocess/src/tests/process_tests.cpp | 42 ++++++++++++++
2 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b134530f/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 1b427fe..a26122f 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -468,6 +468,15 @@ public:
#undef TEMPLATE
#endif // __cplusplus >= 201103L
+ // Installs callbacks that get executed if this future completes
+ // because it failed.
+ Future<T> repair(
+ const lambda::function<Future<T>(const Future<T>&)>& f) const;
+
+ // TODO(benh): Add overloads of 'repair' that don't require passing
+ // in a function that takes the 'const Future<T>&' parameter and use
+ // Prefer/LessPrefer to disambiguate.
+
// Invokes the specified function after some duration if this future
// has not been completed (set, failed, or discarded). Note that
// this function is agnostic of discard semantics and while it will
@@ -1467,9 +1476,12 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
namespace internal {
+// NOTE: We need to name this 'thenf' versus 'then' to distinguish it
+// from the function 'then' whose parameter 'f' doesn't return a
+// Future since the compiler can't properly infer otherwise.
template <typename T, typename X>
-void thenf(const memory::shared_ptr<Promise<X> >& promise,
- const lambda::function<Future<X>(const T&)>& f,
+void thenf(const lambda::function<Future<X>(const T&)>& f,
+ const memory::shared_ptr<Promise<X>>& promise,
const Future<T>& future)
{
if (future.isReady()) {
@@ -1487,8 +1499,8 @@ void thenf(const memory::shared_ptr<Promise<X> >& promise,
template <typename T, typename X>
-void then(const memory::shared_ptr<Promise<X> >& promise,
- const lambda::function<X(const T&)>& f,
+void then(const lambda::function<X(const T&)>& f,
+ const memory::shared_ptr<Promise<X>>& promise,
const Future<T>& future)
{
if (future.isReady()) {
@@ -1506,6 +1518,21 @@ void then(const memory::shared_ptr<Promise<X> >& promise,
template <typename T>
+void repair(
+ const lambda::function<Future<T>(const Future<T>&)>& f,
+ const memory::shared_ptr<Promise<T>>& promise,
+ const Future<T>& future)
+{
+ CHECK(!future.isPending());
+ if (future.isFailed()) {
+ promise->associate(f(future));
+ } else {
+ promise->associate(future);
+ }
+}
+
+
+template <typename T>
void expired(
const lambda::function<Future<T>(const Future<T>&)>& f,
const memory::shared_ptr<Latch>& latch,
@@ -1546,10 +1573,10 @@ template <typename T>
template <typename X>
Future<X> Future<T>::then(const lambda::function<Future<X>(const T&)>& f) const
{
- memory::shared_ptr<Promise<X> > promise(new Promise<X>());
+ memory::shared_ptr<Promise<X>> promise(new Promise<X>());
lambda::function<void(const Future<T>&)> thenf =
- lambda::bind(&internal::thenf<T, X>, promise, f, lambda::_1);
+ lambda::bind(&internal::thenf<T, X>, f, promise, lambda::_1);
onAny(thenf);
@@ -1566,10 +1593,10 @@ template <typename T>
template <typename X>
Future<X> Future<T>::then(const lambda::function<X(const T&)>& f) const
{
- memory::shared_ptr<Promise<X> > promise(new Promise<X>());
+ memory::shared_ptr<Promise<X>> promise(new Promise<X>());
lambda::function<void(const Future<T>&)> then =
- lambda::bind(&internal::then<T, X>, promise, f, lambda::_1);
+ lambda::bind(&internal::then<T, X>, f, promise, lambda::_1);
onAny(then);
@@ -1583,6 +1610,23 @@ Future<X> Future<T>::then(const lambda::function<X(const T&)>& f) const
template <typename T>
+Future<T> Future<T>::repair(
+ const lambda::function<Future<T>(const Future<T>&)>& f) const
+{
+ memory::shared_ptr<Promise<T>> promise(new Promise<T>());
+
+ onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
+
+ // Propagate discarding up the chain. To avoid cyclic dependencies,
+ // we keep a weak future in the callback.
+ promise->future().onDiscard(
+ lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
+
+ return promise->future();
+}
+
+
+template <typename T>
Future<T> Future<T>::after(
const Duration& duration,
const lambda::function<Future<T>(const Future<T>&)>& f) const
http://git-wip-us.apache.org/repos/asf/mesos/blob/b134530f/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index fe758ab..1bcb3c6 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -164,6 +164,48 @@ TEST(Process, then)
}
+Future<int> repair(const Future<int>& future)
+{
+ EXPECT_TRUE(future.isFailed());
+ EXPECT_EQ("Failure", future.failure());
+ return 43;
+}
+
+
+// Checks that 'repair' callback gets executed if the future failed
+// and not executed if the future is completed successfully.
+TEST(Process, repair)
+{
+ // Check that the 'repair' callback _does not_ get executed by
+ // making sure that when we complete the promise with a value that's
+ // the value that we get back.
+ Promise<int> promise1;
+
+ Future<int> future1 = promise1.future()
+ .repair(lambda::bind(&repair, lambda::_1));
+
+ EXPECT_TRUE(future1.isPending());
+
+ promise1.set(42); // So this means 'repair' should not get executed.
+
+ AWAIT_EXPECT_EQ(42, future1);
+
+ // Check that the 'repair' callback gets executed by failing the
+ // promise which should invoke the 'repair' callback.
+ Promise<int> promise2;
+
+ Future<int> future2 = promise2.future()
+ .repair(lambda::bind(&repair, lambda::_1));
+
+ EXPECT_TRUE(future2.isPending());
+
+ promise2.fail("Failure"); // So 'repair' should get called returning '43'.
+
+ AWAIT_EXPECT_EQ(43, future2);
+}
+
+
+
Future<Nothing> after(volatile bool* executed, const Future<Nothing>& future)
{
EXPECT_TRUE(future.hasDiscard());
[19/20] mesos git commit: Refactored Once abstraction to not use
Promise.
Posted by be...@apache.org.
Refactored Once abstraction to not use Promise.
Review: https://reviews.apache.org/r/29569
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f9afca4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f9afca4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f9afca4
Branch: refs/heads/master
Commit: 9f9afca4e8556653880559e9d1dac18a0321c5e8
Parents: 4065fa6
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 3 09:37:39 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:46:44 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/once.hpp | 45 +++++++++++++++++++----
1 file changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9afca4/3rdparty/libprocess/include/process/once.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/once.hpp b/3rdparty/libprocess/include/process/once.hpp
index e85b382..256ed07 100644
--- a/3rdparty/libprocess/include/process/once.hpp
+++ b/3rdparty/libprocess/include/process/once.hpp
@@ -12,7 +12,17 @@ namespace process {
class Once
{
public:
- Once() {}
+ Once() : started(false), finished(false)
+ {
+ pthread_mutex_init(&mutex, NULL);
+ pthread_cond_init(&cond, NULL);
+ }
+
+ ~Once()
+ {
+ pthread_cond_destroy(&cond);
+ pthread_mutex_destroy(&mutex);
+ }
// Returns true if this Once instance has already transitioned to a
// 'done' state (i.e., the action you wanted to perform "once" has
@@ -20,18 +30,35 @@ public:
// called.
bool once()
{
- if (!outer.set(&inner)) {
- inner.future().await();
- return true;
+ bool result = false;
+
+ pthread_mutex_lock(&mutex);
+ {
+ if (started) {
+ while (!finished) {
+ pthread_cond_wait(&cond, &mutex);
+ }
+ result = true;
+ } else {
+ started = true;
+ }
}
+ pthread_mutex_unlock(&mutex);
- return false;
+ return result;
}
// Transitions this Once instance to a 'done' state.
void done()
{
- inner.set(Nothing());
+ pthread_mutex_lock(&mutex);
+ {
+ if (started && !finished) {
+ finished = true;
+ pthread_cond_broadcast(&cond);
+ }
+ }
+ pthread_mutex_unlock(&mutex);
}
private:
@@ -39,8 +66,10 @@ private:
Once(const Once& that);
Once& operator = (const Once& that);
- Promise<Nothing> inner;
- Promise<Promise<Nothing>*> outer;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ bool started;
+ bool finished;
};
} // namespace process {
[14/20] mesos git commit: Added a URL abstraction.
Posted by be...@apache.org.
Added a URL abstraction.
Review: https://reviews.apache.org/r/29533
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c285f14
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c285f14
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c285f14
Branch: refs/heads/master
Commit: 3c285f14fa2f8af8a0109480299dca0b7f93d47d
Parents: e369699
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Dec 16 13:14:42 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 75 +++++++++++++++++++++++
3rdparty/libprocess/src/tests/http_tests.cpp | 38 ++++++++++++
2 files changed, 113 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3c285f14/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 689f56d..819cee1 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -3,18 +3,22 @@
#include <limits.h>
#include <stdint.h>
+#include <unistd.h>
#include <cctype>
#include <cstdlib>
#include <iomanip>
#include <sstream>
#include <string>
+#include <vector>
#include <process/pid.hpp>
#include <stout/error.hpp>
+#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/json.hpp>
+#include <stout/net.hpp>
#include <stout/none.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
@@ -493,6 +497,77 @@ inline Try<std::string> decode(const std::string& s)
}
+// Represents a Uniform Resource Locator:
+// scheme://domain|ip:port/path?query#fragment
+struct URL
+{
+ URL(const std::string& _scheme,
+ const std::string& _domain,
+ const uint16_t _port = 80,
+ const std::string& _path = "/",
+ const hashmap<std::string, std::string>& _query =
+ hashmap<std::string, std::string>(),
+ const Option<std::string>& _fragment = None())
+ : scheme(_scheme),
+ domain(_domain),
+ port(_port),
+ path(_path),
+ query(_query),
+ fragment(_fragment) {}
+
+ URL(const std::string& _scheme,
+ const net::IP& _ip,
+ const uint16_t _port = 80,
+ const std::string& _path = "/",
+ const hashmap<std::string, std::string>& _query =
+ hashmap<std::string, std::string>(),
+ const Option<std::string>& _fragment = None())
+ : scheme(_scheme),
+ ip(_ip),
+ port(_port),
+ path(_path),
+ query(_query),
+ fragment(_fragment) {}
+
+ std::string scheme;
+ // TODO(benh): Consider using unrestricted union for 'domain' and 'ip'.
+ Option<std::string> domain;
+ Option<net::IP> ip;
+ uint16_t port;
+ std::string path;
+ hashmap<std::string, std::string> query;
+ Option<std::string> fragment;
+};
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const URL& url)
+{
+ stream << url.scheme << "://";
+
+ if (url.domain.isSome()) {
+ stream << url.domain.get();
+ } else if (url.ip.isSome()) {
+ stream << url.ip.get();
+ }
+
+ stream << ":" << url.port;
+
+ stream << "/" << strings::remove(url.path, "/", strings::PREFIX);
+
+ if (!url.query.empty()) {
+ stream << "?" << query::encode(url.query);
+ }
+
+ if (url.fragment.isSome()) {
+ stream << "#" << url.fragment.get();
+ }
+
+ return stream;
+}
+
+
// Asynchronously sends an HTTP GET request to the process with the
// given UPID and returns the HTTP response from the process.
Future<Response> get(
http://git-wip-us.apache.org/repos/asf/mesos/blob/3c285f14/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index ec64e60..73949b8 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -19,11 +19,14 @@
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
+#include <stout/stringify.hpp>
#include "encoder.hpp"
using namespace process;
+using process::http::URL;
+
using process::network::Socket;
using std::string;
@@ -389,3 +392,38 @@ TEST(HTTP, QueryEncodeDecode)
EXPECT_SOME_EQ(HashmapStringString({{"a&b=c", "d&e=fg"}}),
http::query::decode("a%26b%3Dc=d%26e%3Dfg"));
}
+
+
+TEST(URLTest, stringification)
+{
+ EXPECT_EQ("http://mesos.apache.org:80/",
+ stringify(URL("http", "mesos.apache.org")));
+
+ EXPECT_EQ("https://mesos.apache.org:8080/",
+ stringify(URL("https", "mesos.apache.org", 8080)));
+
+ Try<net::IP> ip = net::IP::fromDotDecimal("172.158.1.23");
+ ASSERT_SOME(ip);
+
+ EXPECT_EQ("http://172.158.1.23:8080/",
+ stringify(URL("http", ip.get(), 8080)));
+
+ EXPECT_EQ("http://172.158.1.23:80/path",
+ stringify(URL("http", ip.get(), 80, "/path")));
+
+ hashmap<string, string> query;
+ query["foo"] = "bar";
+ query["baz"] = "bam";
+
+ EXPECT_EQ("http://172.158.1.23:80/?baz=bam&foo=bar",
+ stringify(URL("http", ip.get(), 80, "/", query)));
+
+ EXPECT_EQ("http://172.158.1.23:80/path?baz=bam&foo=bar",
+ stringify(URL("http", ip.get(), 80, "/path", query)));
+
+ EXPECT_EQ("http://172.158.1.23:80/?baz=bam&foo=bar#fragment",
+ stringify(URL("http", ip.get(), 80, "/", query, "fragment")));
+
+ EXPECT_EQ("http://172.158.1.23:80/path?baz=bam&foo=bar#fragment",
+ stringify(URL("http", ip.get(), 80, "/path", query, "fragment")));
+}
[16/20] mesos git commit: Disambiguated use of 'URL' in Mesos.
Posted by be...@apache.org.
Disambiguated use of 'URL' in Mesos.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1993ff24
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1993ff24
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1993ff24
Branch: refs/heads/master
Commit: 1993ff24bb3cd6498da0391048e17f203f49ebaf
Parents: 3c285f1
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 30 15:49:50 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
src/master/contender.cpp | 6 +++---
src/master/detector.cpp | 8 ++++----
src/master/main.cpp | 2 +-
3 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1993ff24/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
index 2270400..0a8c099 100644
--- a/src/master/contender.cpp
+++ b/src/master/contender.cpp
@@ -75,7 +75,7 @@ Try<MasterContender*> MasterContender::create(const string& zk)
if (zk == "") {
return new StandaloneMasterContender();
} else if (strings::startsWith(zk, "zk://")) {
- Try<zookeeper::URL> url = URL::parse(zk);
+ Try<zookeeper::URL> url = zookeeper::URL::parse(zk);
if (url.isError()) {
return Error(url.error());
}
@@ -139,7 +139,7 @@ Future<Future<Nothing> > StandaloneMasterContender::contend()
}
-ZooKeeperMasterContender::ZooKeeperMasterContender(const URL& url)
+ZooKeeperMasterContender::ZooKeeperMasterContender(const zookeeper::URL& url)
{
process = new ZooKeeperMasterContenderProcess(url);
spawn(process);
@@ -174,7 +174,7 @@ Future<Future<Nothing> > ZooKeeperMasterContender::contend()
ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
- const URL& url)
+ const zookeeper::URL& url)
: ProcessBase(ID::generate("zookeeper-master-contender")),
group(new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT)),
contender(NULL) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/1993ff24/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index ebeca61..367d1e1 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -165,7 +165,7 @@ class ZooKeeperMasterDetectorProcess
: public Process<ZooKeeperMasterDetectorProcess>
{
public:
- explicit ZooKeeperMasterDetectorProcess(const URL& url);
+ explicit ZooKeeperMasterDetectorProcess(const zookeeper::URL& url);
explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
~ZooKeeperMasterDetectorProcess();
@@ -200,7 +200,7 @@ Try<MasterDetector*> MasterDetector::create(const string& master)
if (master == "") {
return new StandaloneMasterDetector();
} else if (master.find("zk://") == 0) {
- Try<URL> url = URL::parse(master);
+ Try<zookeeper::URL> url = zookeeper::URL::parse(master);
if (url.isError()) {
return Error(url.error());
}
@@ -290,7 +290,7 @@ Future<Option<MasterInfo> > StandaloneMasterDetector::detect(
// TODO(benh): Get ZooKeeper timeout from configuration.
// TODO(xujyan): Use peer constructor after switching to C++ 11.
ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
- const URL& url)
+ const zookeeper::URL& url)
: ProcessBase(ID::generate("zookeeper-master-detector")),
group(new Group(url.servers,
MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
@@ -436,7 +436,7 @@ void ZooKeeperMasterDetectorProcess::fetched(
}
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(const URL& url)
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const zookeeper::URL& url)
{
process = new ZooKeeperMasterDetectorProcess(url);
spawn(process);
http://git-wip-us.apache.org/repos/asf/mesos/blob/1993ff24/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index a295490..d4adae5 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -235,7 +235,7 @@ int main(int argc, char** argv)
zk_ = zk.get();
}
- Try<URL> url = URL::parse(zk_);
+ Try<zookeeper::URL> url = zookeeper::URL::parse(zk_);
if (url.isError()) {
EXIT(1) << "Error parsing ZooKeeper URL: " << url.error();
}
[08/20] mesos git commit: Removed unnecessary overloads of
Future::after.
Posted by be...@apache.org.
Removed unnecessary overloads of Future::after.
Review: https://reviews.apache.org/r/29536
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/55f23255
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/55f23255
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/55f23255
Branch: refs/heads/master
Commit: 55f23255c9b9b424a7a85e5b0b645eaecc3d28ae
Parents: b134530
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Dec 29 10:04:01 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 42 ---------------------
1 file changed, 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/55f23255/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index a26122f..05bfa34 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -491,48 +491,6 @@ public:
// in a function that takes the 'const Future<T>&' parameter and use
// Prefer/LessPrefer to disambiguate.
-#if __cplusplus >= 201103L
- template <typename F>
- Future<T> after(
- const Duration& duration,
- _Deferred<F>&& f,
- typename std::enable_if<std::is_convertible<_Deferred<F>, std::function<Future<T>(const Future<T>&)>>::value>::type* = NULL) const // NOLINT(whitespace/line_length)
- {
- return after(duration, std::function<Future<T>(const Future<T>&)>(f));
- }
-
- template <typename F>
- Future<T> after(
- const Duration& duration,
- _Deferred<F>&& f,
- typename std::enable_if<std::is_convertible<_Deferred<F>, std::function<Future<T>()>>::value>::type* = NULL) const // NOLINT(whitespace/line_length)
- {
- return after(
- duration,
- std::function<Future<T>(const Future<T>&)>(std::bind(f)));
- }
-#else
- template <typename F>
- Future<T> after(
- const Duration& duration,
- const _Defer<F>& f,
- typename boost::enable_if<boost::is_convertible<_Defer<F>, std::tr1::function<Future<T>(const Future<T>&)> > >::type* = NULL) const // NOLINT(whitespace/line_length)
- {
- return after(duration, std::tr1::function<Future<T>(const Future<T>&)>(f));
- }
-
- template <typename F>
- Future<T> after(
- const Duration& duration,
- const _Defer<F>& f,
- typename boost::enable_if<boost::is_convertible<_Defer<F>, std::tr1::function<Future<T>()> > >::type* = NULL) const // NOLINT(whitespace/line_length)
- {
- return after(
- duration,
- std::tr1::function<Future<T>(const Future<T>&)>(std::tr1::bind(f)));
- }
-#endif // __cplusplus >= 201103L
-
private:
friend class Promise<T>;
friend class WeakFuture<T>;
[06/20] mesos git commit: Added HTTP query string encode / decode.
Posted by be...@apache.org.
Added HTTP query string encode / decode.
Review: https://reviews.apache.org/r/30441
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/93bed983
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/93bed983
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/93bed983
Branch: refs/heads/master
Commit: 93bed983262e4a78865b2c50548df9b2e9d19b43
Parents: dc9be77
Author: Cody Maloney <co...@mesosphere.io>
Authored: Fri Jan 30 17:57:29 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:45 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 23 ++------
3rdparty/libprocess/src/decoder.hpp | 8 ++-
3rdparty/libprocess/src/http.cpp | 71 ++++++++++++++++++++++-
3rdparty/libprocess/src/tests/http_tests.cpp | 40 +++++++++++++
4 files changed, 120 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/93bed983/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 9cf05ac..1d1ad41 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -380,12 +380,12 @@ inline Try<hashmap<std::string, std::string> > parse(
namespace query {
-// Parses an HTTP query string into a map. For example:
+// Decodes an HTTP query string into a map. For example:
//
-// parse("foo=1;bar=2;baz;foo=3")
+// decode("foo=1&bar=%20&baz&foo=3")
//
// Would return a map with the following:
-// bar: "2"
+// bar: " "
// baz: ""
// foo: "3"
//
@@ -394,22 +394,9 @@ namespace query {
// http://en.wikipedia.org/wiki/Query_string
// TODO(bmahler): If needed, investigate populating the query map inline
// for better performance.
-inline hashmap<std::string, std::string> parse(const std::string& query)
-{
- hashmap<std::string, std::string> result;
+Try<hashmap<std::string, std::string>> decode(const std::string& query);
- const std::vector<std::string>& tokens = strings::tokenize(query, ";&");
- foreach (const std::string& token, tokens) {
- const std::vector<std::string>& pairs = strings::split(token, "=");
- if (pairs.size() == 2) {
- result[pairs[0]] = pairs[1];
- } else if (pairs.size() == 1) {
- result[pairs[0]] = "";
- }
- }
-
- return result;
-}
+std::string encode(const hashmap<std::string, std::string>& query);
} // namespace query {
http://git-wip-us.apache.org/repos/asf/mesos/blob/93bed983/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index 8168464..45f3d7f 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -120,12 +120,16 @@ private:
// std::cout << "http::Request:" << std::endl;
// std::cout << " method: " << decoder->request->method << std::endl;
// std::cout << " path: " << decoder->request->path << std::endl;
+
// Parse the query key/values.
- Try<std::string> decoded = http::decode(decoder->query);
+ Try<hashmap<std::string, std::string>> decoded =
+ http::query::decode(decoder->query);
+
if (decoded.isError()) {
return 1;
}
- decoder->request->query = http::query::parse(decoded.get());
+
+ decoder->request->query = decoded.get();
Option<std::string> encoding =
decoder->request->headers.get("Content-Encoding");
http://git-wip-us.apache.org/repos/asf/mesos/blob/93bed983/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 869b205..ec9823e 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -32,6 +32,56 @@ namespace http {
hashmap<uint16_t, string> statuses;
+namespace query {
+
+Try<hashmap<std::string, std::string>> decode(const std::string& query)
+{
+ hashmap<std::string, std::string> result;
+
+ const std::vector<std::string>& tokens = strings::tokenize(query, ";&");
+ foreach (const std::string& token, tokens) {
+ const std::vector<std::string>& pairs = strings::split(token, "=", 2);
+ if (pairs.size() == 0) {
+ continue;
+ }
+
+ Try<std::string> key = http::decode(pairs[0]);
+ if (key.isError()) {
+ return Error(key.error());
+ }
+
+ if (pairs.size() == 2) {
+ Try<std::string> value = http::decode(pairs[1]);
+ if (value.isError()) {
+ return Error(value.error());
+ }
+ result[key.get()] = value.get();
+
+ } else if (pairs.size() == 1) {
+ result[key.get()] = "";
+ }
+ }
+
+ return result;
+}
+
+
+std::string encode(const hashmap<std::string, std::string>& query)
+{
+ std::string output;
+
+ foreachpair (const std::string& key, const std::string& value, query) {
+ output += http::encode(key);
+ if (!value.empty()) {
+ output += "=" + http::encode(value);
+ }
+ output += '&';
+ }
+ return strings::remove(output, "&", strings::SUFFIX);
+}
+
+} // namespace query {
+
namespace internal {
Future<Response> decode(const string& buffer)
@@ -161,8 +211,25 @@ Future<Response> get(
const Option<string>& query,
const Option<hashmap<string, string> >& headers)
{
- return internal::request(
- upid, "GET", path, query, headers, None(), None());
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+
+ if (path.isSome()) {
+ // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+ url.path = strings::join("/", url.path, path.get());
+ }
+
+ if (query.isSome()) {
+ Try<hashmap<string, string>> decode = http::query::decode(
+ strings::remove(query.get(), "?", strings::PREFIX));
+
+ if (decode.isError()) {
+ return Failure("Failed to decode HTTP query string: " + decode.error());
+ }
+
+ url.query = decode.get();
+ }
+
+ return get(url, headers);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/93bed983/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index c71a91d..f7084ef 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -353,3 +353,43 @@ TEST(HTTP, Post)
terminate(process);
wait(process);
}
+
+TEST(HTTP, QueryEncodeDecode)
+{
+ // If we use Type<a, b> directly inside a macro without surrounding
+ // parenthesis the comma will be eaten by the macro rather than the
+ // template. Typedef to avoid the problem.
+ typedef hashmap<string, string> HashmapStringString;
+
+ EXPECT_EQ("",
+ http::query::encode(HashmapStringString({})));
+
+ EXPECT_EQ("foo=bar",
+ http::query::encode(HashmapStringString({{"foo", "bar"}})));
+
+ EXPECT_EQ("c%7E%2Fasdf=%25asdf&a()=b%2520",
+ http::query::encode(
+ HashmapStringString({{"a()", "b%20"}, {"c~/asdf", "%asdf"}})));
+
+ EXPECT_EQ("d",
+ http::query::encode(HashmapStringString({{"d", ""}})));
+
+ EXPECT_EQ("a%26b%3Dc=d%26e%3Dfg",
+ http::query::encode(HashmapStringString({{"a&b=c", "d&e=fg"}})));
+
+ // Explicitly not testing decoding failures.
+ EXPECT_SOME_EQ(HashmapStringString(),
+ http::query::decode(""));
+
+ EXPECT_SOME_EQ(HashmapStringString({{"foo", "bar"}}),
+ http::query::decode("foo=bar"));
+
+ EXPECT_SOME_EQ(HashmapStringString({{"a()", "b%20"}, {"c~/asdf", "%asdf"}}),
+ http::query::decode("c%7E%2Fasdf=%25asdf&a()=b%2520"));
+
+ EXPECT_SOME_EQ(HashmapStringString({{"d", ""}}),
+ http::query::decode("d"));
+
+ EXPECT_SOME_EQ(HashmapStringString({{"a&b=c", "d&e=fg"}}),
+ http::query::decode("a%26b%3Dc=d%26e%3Dfg"));
+}
[15/20] mesos git commit: Added http::get/put/post that take URL.
Posted by be...@apache.org.
Added http::get/put/post that take URL.
Review: https://reviews.apache.org/r/29534
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/853bd8f8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/853bd8f8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/853bd8f8
Branch: refs/heads/master
Commit: 853bd8f8bc50c8d5c80acd1326714732000b4d66
Parents: 1993ff2
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Dec 16 13:15:02 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 31 ++++-
3rdparty/libprocess/src/http.cpp | 125 ++++++++++++++-----
3rdparty/libprocess/src/tests/http_tests.cpp | 4 +-
3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
4 files changed, 129 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/853bd8f8/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index 819cee1..f90f2c5 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -43,11 +43,15 @@ struct Request
// Tracked by: https://issues.apache.org/jira/browse/MESOS-328.
hashmap<std::string, std::string> headers;
std::string method;
+
+ // TODO(benh): Replace 'url', 'path', 'query', and 'fragment' with URL.
std::string url; // (path?query#fragment)
std::string path;
- std::string fragment;
hashmap<std::string, std::string> query;
+ std::string fragment;
+
std::string body;
+
bool keepAlive;
// Returns whether the encoding is considered acceptable in the request.
@@ -568,6 +572,31 @@ inline std::ostream& operator << (
}
+// Asynchronously sends an HTTP GET request to the specified URL and
+// returns the HTTP response.
+Future<Response> get(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None());
+
+
+// Asynchronously sends an HTTP PUT request to the specified URL and
+// returns the HTTP response.
+Future<Response> put(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None(),
+ const Option<std::string>& body = None(),
+ const Option<std::string>& contentType = None());
+
+
+// Asynchronously sends an HTTP POST request to the specified URL and
+// returns the HTTP response.
+Future<Response> post(
+ const URL& url,
+ const Option<hashmap<std::string, std::string>>& headers = None(),
+ const Option<std::string>& body = None(),
+ const Option<std::string>& contentType = None());
+
+
// Asynchronously sends an HTTP GET request to the process with the
// given UPID and returns the HTTP response from the process.
Future<Response> get(
http://git-wip-us.apache.org/repos/asf/mesos/blob/853bd8f8/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 30e825d..c18de81 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -6,6 +6,7 @@
#include <deque>
#include <iostream>
#include <string>
+#include <vector>
#include <process/future.hpp>
#include <process/http.hpp>
@@ -21,6 +22,7 @@
using std::deque;
using std::string;
+using std::vector;
using process::http::Request;
using process::http::Response;
@@ -28,7 +30,6 @@ using process::http::Response;
using process::network::Socket;
namespace process {
-
namespace http {
hashmap<uint16_t, string> statuses;
@@ -111,24 +112,25 @@ Future<Response> decode(const string& buffer)
// Forward declaration.
Future<Response> _request(
Socket socket,
- const UPID& upid,
+ const Node& node,
+ const URL& url,
const string& method,
- const Option<string>& path,
- const Option<string>& query,
- const Option<hashmap<string, string> >& _headers,
+ const Option<hashmap<string, string>>& _headers,
const Option<string>& body,
const Option<string>& contentType);
Future<Response> request(
- const UPID& upid,
+ const URL& url,
const string& method,
- const Option<string>& path,
- const Option<string>& query,
- const Option<hashmap<string, string> >& headers,
+ const Option<hashmap<string, string>>& headers,
const Option<string>& body,
const Option<string>& contentType)
{
+ if (url.scheme != "http") {
+ return Failure("Unsupported URL scheme");
+ }
+
Try<Socket> create = Socket::create();
if (create.isError()) {
@@ -137,13 +139,31 @@ Future<Response> request(
Socket socket = create.get();
- return socket.connect(upid.node)
+ Node node;
+
+ if (url.ip.isSome()) {
+ node.ip = url.ip.get().address();
+ } else if (url.domain.isNone()) {
+ return Failure("Missing URL domain or IP");
+ } else {
+ Try<uint32_t> ip = net::getIP(url.domain.get(), AF_INET);
+
+ if (ip.isError()) {
+ return Failure("Failed to determine IP of domain '" +
+ url.domain.get() + "': " + ip.error());
+ }
+
+ node.ip = ip.get();
+ }
+
+ node.port = url.port;
+
+ return socket.connect(node)
.then(lambda::bind(&_request,
socket,
- upid,
+ node,
+ url,
method,
- path,
- query,
headers,
body,
contentType));
@@ -152,24 +172,30 @@ Future<Response> request(
Future<Response> _request(
Socket socket,
- const UPID& upid,
+ const Node& node,
+ const URL& url,
const string& method,
- const Option<string>& path,
- const Option<string>& query,
- const Option<hashmap<string, string> >& _headers,
+ const Option<hashmap<string, string>>& _headers,
const Option<string>& body,
const Option<string>& contentType)
{
std::ostringstream out;
- out << method << " /" << upid.id;
+ out << method << " /" << strings::remove(url.path, "/", strings::PREFIX);
- if (path.isSome()) {
- out << "/" << path.get();
+ if (!url.query.empty()) {
+ // Convert the query to a string that we join via '=' and '&'.
+ vector<string> query;
+
+ foreachpair (const string& key, const string& value, url.query) {
+ query.push_back(key + "=" + value);
+ }
+
+ out << "?" << strings::join("&", query);
}
- if (query.isSome()) {
- out << "?" << query.get();
+ if (url.fragment.isSome()) {
+ out << "#" << url.fragment.get();
}
out << " HTTP/1.1\r\n";
@@ -182,7 +208,7 @@ Future<Response> _request(
}
// Need to specify the 'Host' header.
- headers["Host"] = stringify(upid.node);
+ headers["Host"] = stringify(node);
// Tell the server to close the connection when it's done.
headers["Connection"] = "close";
@@ -220,17 +246,52 @@ Future<Response> _request(
.then(lambda::bind(&internal::decode, lambda::_1));
}
-
} // namespace internal {
Future<Response> get(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers)
+{
+ return internal::request(url, "GET", headers, None(), None());
+}
+
+
+Future<Response> put(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ if (body.isNone() && contentType.isSome()) {
+ return Failure("Attempted to do a PUT with a Content-Type but no body");
+ }
+
+ return internal::request(url, "PUT", headers, body, contentType);
+}
+
+
+Future<Response> post(
+ const URL& url,
+ const Option<hashmap<string, string>>& headers,
+ const Option<string>& body,
+ const Option<string>& contentType)
+{
+ if (body.isNone() && contentType.isSome()) {
+ return Failure("Attempted to do a POST with a Content-Type but no body");
+ }
+
+ return internal::request(url, "POST", headers, body, contentType);
+}
+
+
+Future<Response> get(
const UPID& upid,
const Option<string>& path,
const Option<string>& query,
- const Option<hashmap<string, string> >& headers)
+ const Option<hashmap<string, string>>& headers)
{
- URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
+ URL url("http", net::IP(upid.node.ip), upid.node.port, upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
@@ -255,16 +316,18 @@ Future<Response> get(
Future<Response> post(
const UPID& upid,
const Option<string>& path,
- const Option<hashmap<string, string> >& headers,
+ const Option<hashmap<string, string>>& headers,
const Option<string>& body,
const Option<string>& contentType)
{
- if (body.isNone() && contentType.isSome()) {
- return Failure("Attempted to do a POST with a Content-Type but no body");
+ URL url("http", net::IP(upid.node.ip), upid.node.port, upid.id);
+
+ if (path.isSome()) {
+ // TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
+ url.path = strings::join("/", url.path, path.get());
}
- return internal::request(
- upid, "POST", path, None(), headers, body, contentType);
+ return post(url, headers, body, contentType);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/853bd8f8/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 73949b8..aaa1646 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -259,7 +259,7 @@ http::Response validateGetWithQuery(const http::Request& request)
EXPECT_EQ("GET", request.method);
EXPECT_THAT(request.path, EndsWith("get"));
EXPECT_EQ("", request.body);
- EXPECT_EQ("frag", request.fragment);
+ EXPECT_EQ("", request.fragment);
EXPECT_EQ("bar", request.query.at("foo"));
EXPECT_EQ(1, request.query.size());
@@ -287,7 +287,7 @@ TEST(HTTP, Get)
.WillOnce(Invoke(validateGetWithQuery));
Future<http::Response> queryFuture =
- http::get(process.self(), "get", "foo=bar#frag");
+ http::get(process.self(), "get", "foo=bar");
AWAIT_READY(queryFuture);
ASSERT_EQ(http::statuses[200], queryFuture.get().status);
http://git-wip-us.apache.org/repos/asf/mesos/blob/853bd8f8/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 0c80c69..f9b2dc0 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -229,7 +229,7 @@ TEST(Metrics, SnapshotTimeout)
// Ensure the timeout parameter is validated.
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
BadRequest().status,
- http::get(upid, "snapshot?timeout=foobar"));
+ http::get(upid, "snapshot", "timeout=foobar"));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
@@ -253,7 +253,7 @@ TEST(Metrics, SnapshotTimeout)
Clock::advance(Seconds(1));
// Get the snapshot.
- Future<Response> response = http::get(upid, "snapshot?timeout=2secs");
+ Future<Response> response = http::get(upid, "snapshot", "timeout=2secs");
// Make sure the request is pending before the timeout is exceeded.
Clock::settle();
@@ -296,7 +296,7 @@ TEST(Metrics, SnapshotTimeout)
// Ensure MetricsProcess has removed the metrics.
Clock::settle();
- response = http::get(upid, "snapshot?timeout=2secs");
+ response = http::get(upid, "snapshot", "timeout=2secs");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
[07/20] mesos git commit: Replaced Node with network::Address.
Posted by be...@apache.org.
Replaced Node with network::Address.
Review: https://reviews.apache.org/r/29538
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/257bd1ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/257bd1ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/257bd1ad
Branch: refs/heads/master
Commit: 257bd1adaf8f0e236d975c4a2d93aa06cc6a79f1
Parents: 55f2325
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 24 08:48:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:46 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/Makefile.am | 2 +-
3rdparty/libprocess/include/process/address.hpp | 84 +++++++++++
3rdparty/libprocess/include/process/future.hpp | 2 +-
3rdparty/libprocess/include/process/network.hpp | 47 +++---
3rdparty/libprocess/include/process/node.hpp | 72 ---------
3rdparty/libprocess/include/process/pid.hpp | 36 ++---
3rdparty/libprocess/include/process/process.hpp | 5 +-
3rdparty/libprocess/include/process/socket.hpp | 20 ++-
3rdparty/libprocess/src/http.cpp | 23 +--
3rdparty/libprocess/src/pid.cpp | 20 +--
3rdparty/libprocess/src/poll_socket.cpp | 4 +-
3rdparty/libprocess/src/poll_socket.hpp | 2 +-
3rdparty/libprocess/src/process.cpp | 151 ++++++++++---------
3rdparty/libprocess/src/socket.cpp | 14 +-
3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +-
3rdparty/libprocess/src/tests/http_tests.cpp | 2 +-
3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +-
3rdparty/libprocess/src/tests/process_tests.cpp | 13 +-
18 files changed, 273 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/Makefile.am b/3rdparty/libprocess/include/Makefile.am
index 988c8d5..3da3e6c 100644
--- a/3rdparty/libprocess/include/Makefile.am
+++ b/3rdparty/libprocess/include/Makefile.am
@@ -1,5 +1,6 @@
# Headers.
nobase_include_HEADERS = \
+ process/address.hpp \
process/async.hpp \
process/check.hpp \
process/clock.hpp \
@@ -32,7 +33,6 @@ nobase_include_HEADERS = \
process/mime.hpp \
process/mutex.hpp \
process/network.hpp \
- process/node.hpp \
process/once.hpp \
process/owned.hpp \
process/pid.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
new file mode 100644
index 0000000..7db6271
--- /dev/null
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -0,0 +1,84 @@
+#ifndef __PROCESS_ADDRESS_HPP__
+#define __PROCESS_ADDRESS_HPP__
+
+#include <stdint.h>
+#include <unistd.h>
+
+#include <arpa/inet.h>
+
+#include <glog/logging.h>
+
+#include <sstream>
+
+#include <boost/functional/hash.hpp>
+
+namespace process {
+namespace network {
+
+// Represents a network "address", subsuming the struct addrinfo and
+// struct sockaddr* that typically is used to encapsulate IP and port.
+class Address
+{
+public:
+ Address() : ip(0), port(0) {}
+
+ Address(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
+
+ bool operator < (const Address& that) const
+ {
+ if (ip == that.ip) {
+ return port < that.port;
+ } else {
+ return ip < that.ip;
+ }
+ }
+
+ bool operator > (const Address& that) const
+ {
+ if (ip == that.ip) {
+ return port > that.port;
+ } else {
+ return ip > that.ip;
+ }
+ }
+
+ bool operator == (const Address& that) const
+ {
+ return (ip == that.ip && port == that.port);
+ }
+
+ bool operator != (const Address& that) const
+ {
+ return !(*this == that);
+ }
+
+ uint32_t ip;
+ uint16_t port;
+};
+
+
+inline std::ostream& operator << (std::ostream& stream, const Address& address)
+{
+ char ip[INET_ADDRSTRLEN];
+ if (inet_ntop(AF_INET, (in_addr*) &address.ip, ip, INET_ADDRSTRLEN) == NULL) {
+ PLOG(FATAL) << "Failed to get human-readable IP address for '"
+ << address.ip << "'";
+ }
+ stream << ip << ":" << address.port;
+ return stream;
+}
+
+
+// Address hash value (for example, to use in Boost's unordered maps).
+inline std::size_t hash_value(const Address& address)
+{
+ size_t seed = 0;
+ boost::hash_combine(seed, address.ip);
+ boost::hash_combine(seed, address.port);
+ return seed;
+}
+
+} // namespace network {
+} // namespace process {
+
+#endif // __PROCESS_ADDRESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 05bfa34..68d6682 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -37,7 +37,7 @@
namespace process {
-// Forward declaration (instead of include to break circular dependency).
+// Forward declarations (instead of include to break circular dependency).
template <typename _F>
struct _Defer;
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/network.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/network.hpp b/3rdparty/libprocess/include/process/network.hpp
index 2ceea8c..7620278 100644
--- a/3rdparty/libprocess/include/process/network.hpp
+++ b/3rdparty/libprocess/include/process/network.hpp
@@ -1,7 +1,7 @@
#ifndef __PROCESS_NETWORK_HPP__
#define __PROCESS_NETWORK_HPP__
-#include <process/node.hpp>
+#include <process/address.hpp>
#include <stout/net.hpp>
#include <stout/try.hpp>
@@ -31,7 +31,8 @@ inline Try<int> socket(int family, int type, int protocol)
return s;
}
-// accept, bind, connect, getsockname wrappers for different protocol families
+
+// TODO(benh): Remove and defer to Socket::accept.
inline Try<int> accept(int s, sa_family_t family)
{
switch (family) {
@@ -52,48 +53,54 @@ inline Try<int> accept(int s, sa_family_t family)
}
-inline Try<int> bind(int s, const Node& node)
+// TODO(benh): Remove and defer to Socket::bind.
+inline Try<int> bind(int s, const Address& address)
{
- sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
+ sockaddr_in addr = net::createSockaddrIn(address.ip, address.port);
int error = ::bind(s, (sockaddr*) &addr, sizeof(addr));
if (error < 0) {
- return ErrnoError("Failed to bind on " + stringify(node));
+ return ErrnoError("Failed to bind on " + stringify(address));
}
return error;
}
-inline Try<int> connect(int s, const Node& node)
+// TODO(benh): Remove and defer to Socket::connect.
+inline Try<int> connect(int s, const Address& address)
{
- sockaddr_in addr = net::createSockaddrIn(node.ip, node.port);
+ sockaddr_in addr = net::createSockaddrIn(address.ip, address.port);
int error = ::connect(s, (sockaddr*) &addr, sizeof(addr));
if (error < 0) {
- return ErrnoError("Failed to connect to " + stringify(node));
+ return ErrnoError("Failed to connect to " + stringify(address));
}
return error;
}
-inline Try<Node> getsockname(int s, sa_family_t family)
+inline Try<Address> address(int s)
{
- switch (family) {
- case AF_INET: {
- sockaddr_in addr = net::createSockaddrIn(0, 0);
- socklen_t addrlen = sizeof(addr);
+ union {
+ struct sockaddr s;
+ struct sockaddr_in v4;
+ struct sockaddr_in6 v6;
+ } addr;
- if(::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) {
- return ErrnoError("Failed to getsockname");
- }
+ socklen_t addrlen = sizeof(addr);
- return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
- }
- default:
- return Error("Unsupported family type: " + stringify(family));
+ if (::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) {
+ return ErrnoError("Failed to getsockname");
}
+
+ if (addr.s.sa_family == AF_INET) {
+ return Address(addr.v4.sin_addr.s_addr, ntohs(addr.v4.sin_port));
+ }
+
+ return Error("Unsupported IP address family '" +
+ stringify(addr.s.sa_family) + "'");
}
} // namespace network {
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/node.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/node.hpp b/3rdparty/libprocess/include/process/node.hpp
deleted file mode 100644
index 173eb8a..0000000
--- a/3rdparty/libprocess/include/process/node.hpp
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef __PROCESS_NODE_HPP__
-#define __PROCESS_NODE_HPP__
-
-#include <stdint.h>
-#include <unistd.h>
-
-#include <arpa/inet.h>
-
-#include <glog/logging.h>
-
-#include <sstream>
-
-#include <boost/functional/hash.hpp>
-
-namespace process {
-
-// Represents a remote "node" (encapsulates IP address and port).
-class Node
-{
-public:
- Node() : ip(0), port(0) {}
-
- Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {}
-
- bool operator < (const Node& that) const
- {
- if (ip == that.ip) {
- return port < that.port;
- } else {
- return ip < that.ip;
- }
- }
-
- bool operator == (const Node& that) const
- {
- return (ip == that.ip && port == that.port);
- }
-
- bool operator != (const Node& that) const
- {
- return !(*this == that);
- }
-
- uint32_t ip;
- uint16_t port;
-};
-
-
-inline std::ostream& operator << (std::ostream& stream, const Node& node)
-{
- char ip[INET_ADDRSTRLEN];
- if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) == NULL) {
- PLOG(FATAL) << "Failed to get human-readable IP address for '"
- << node.ip << "'";
- }
- stream << ip << ":" << node.port;
- return stream;
-}
-
-
-// UPID hash value (for example, to use in Boost's unordered maps).
-inline std::size_t hash_value(const Node& node)
-{
- size_t seed = 0;
- boost::hash_combine(seed, node.ip);
- boost::hash_combine(seed, node.port);
- return seed;
-}
-
-} // namespace process {
-
-#endif // __PROCESS_NODE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/pid.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp
index 7dccf29..30c466c 100644
--- a/3rdparty/libprocess/include/process/pid.hpp
+++ b/3rdparty/libprocess/include/process/pid.hpp
@@ -7,7 +7,7 @@
#include <sstream>
#include <string>
-#include <process/node.hpp>
+#include <process/address.hpp>
namespace process {
@@ -20,19 +20,19 @@ struct UPID
UPID() = default;
UPID(const UPID& that)
- : id(that.id), node(that.node) {}
+ : id(that.id), address(that.address) {}
UPID(const char* id_, uint32_t ip_, uint16_t port_)
- : id(id_), node(ip_, port_) {}
+ : id(id_), address(ip_, port_) {}
- UPID(const char* id_, const Node& node_)
- : id(id_), node(node_) {}
+ UPID(const char* id_, const network::Address& address_)
+ : id(id_), address(address_) {}
UPID(const std::string& id_, uint32_t ip_, uint16_t port_)
- : id(id_), node(ip_, port_) {}
+ : id(id_), address(ip_, port_) {}
- UPID(const std::string& id_, const Node& node_)
- : id(id_), node(node_) {}
+ UPID(const std::string& id_, const network::Address& address_)
+ : id(id_), address(address_) {}
/*implicit*/ UPID(const char* s);
@@ -44,24 +44,26 @@ struct UPID
operator bool () const
{
- return id != "" && node.ip != 0 && node.port != 0;
+ return id != "" && address.ip != 0 && address.port != 0;
}
bool operator ! () const // NOLINT(whitespace/operators)
{
- return id == "" && node.ip == 0 && node.port == 0;
+ return id == "" && address.ip == 0 && address.port == 0;
}
bool operator < (const UPID& that) const
{
- if (node == that.node)
- return id < that.id;
- else return node < that.node;
+ if (address == that.address) {
+ return id < that.id;
+ } else {
+ return address < that.address;
+ }
}
bool operator == (const UPID& that) const
{
- return (id == that.id && node == that.node);
+ return (id == that.id && address == that.address);
}
bool operator != (const UPID& that) const
@@ -70,7 +72,7 @@ struct UPID
}
std::string id;
- Node node;
+ network::Address address;
};
@@ -91,7 +93,7 @@ struct PID : UPID
(void)base; // Eliminate unused base warning.
PID<Base> pid;
pid.id = id;
- pid.node = node;
+ pid.address = address;
return pid;
}
};
@@ -107,6 +109,4 @@ std::size_t hash_value(const UPID&);
} // namespace process {
-
-
#endif // __PROCESS_PID_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 3708f98..392c74d 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -7,6 +7,7 @@
#include <map>
#include <queue>
+#include <process/address.hpp>
#include <process/clock.hpp>
#include <process/event.hpp>
#include <process/filter.hpp>
@@ -276,9 +277,9 @@ void finalize();
/**
- * Returns the node associated with this instance of the library.
+ * Returns the socket address associated with this instance of the library.
*/
-Node node();
+network::Address address();
/**
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 06161f2..426b3fa 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -3,8 +3,8 @@
#include <memory>
+#include <process/address.hpp>
#include <process/future.hpp>
-#include <process/node.hpp>
#include <stout/abort.hpp>
#include <stout/nothing.hpp>
@@ -60,10 +60,11 @@ public:
}
// Socket::Impl interface.
- virtual Try<Node> bind(const Node& node);
+ virtual Try<Address> address() const;
+ virtual Try<Address> bind(const Address& address);
virtual Try<Nothing> listen(int backlog) = 0;
virtual Future<Socket> accept() = 0;
- virtual Future<Nothing> connect(const Node& node) = 0;
+ virtual Future<Nothing> connect(const Address& address) = 0;
virtual Future<size_t> recv(char* data, size_t size) = 0;
virtual Future<size_t> send(const char* data, size_t size) = 0;
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
@@ -112,14 +113,19 @@ public:
return impl->get();
}
+ Try<Address> address() const
+ {
+ return impl->address();
+ }
+
int get() const
{
return impl->get();
}
- Try<Node> bind(const Node& node)
+ Try<Address> bind(const Address& address)
{
- return impl->bind(node);
+ return impl->bind(address);
}
Try<Nothing> listen(int backlog)
@@ -132,9 +138,9 @@ public:
return impl->accept();
}
- Future<Nothing> connect(const Node& node)
+ Future<Nothing> connect(const Address& address)
{
- return impl->connect(node);
+ return impl->connect(address);
}
Future<size_t> recv(char* data, size_t size) const
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index c18de81..7503313 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -27,6 +27,7 @@ using std::vector;
using process::http::Request;
using process::http::Response;
+using process::network::Address;
using process::network::Socket;
namespace process {
@@ -112,7 +113,7 @@ Future<Response> decode(const string& buffer)
// Forward declaration.
Future<Response> _request(
Socket socket,
- const Node& node,
+ const Address& address,
const URL& url,
const string& method,
const Option<hashmap<string, string>>& _headers,
@@ -139,10 +140,10 @@ Future<Response> request(
Socket socket = create.get();
- Node node;
+ Address address;
if (url.ip.isSome()) {
- node.ip = url.ip.get().address();
+ address.ip = url.ip.get().address();
} else if (url.domain.isNone()) {
return Failure("Missing URL domain or IP");
} else {
@@ -153,15 +154,15 @@ Future<Response> request(
url.domain.get() + "': " + ip.error());
}
- node.ip = ip.get();
+ address.ip = ip.get();
}
- node.port = url.port;
+ address.port = url.port;
- return socket.connect(node)
+ return socket.connect(address)
.then(lambda::bind(&_request,
socket,
- node,
+ address,
url,
method,
headers,
@@ -172,7 +173,7 @@ Future<Response> request(
Future<Response> _request(
Socket socket,
- const Node& node,
+ const Address& address,
const URL& url,
const string& method,
const Option<hashmap<string, string>>& _headers,
@@ -208,7 +209,7 @@ Future<Response> _request(
}
// Need to specify the 'Host' header.
- headers["Host"] = stringify(node);
+ headers["Host"] = stringify(address);
// Tell the server to close the connection when it's done.
headers["Connection"] = "close";
@@ -291,7 +292,7 @@ Future<Response> get(
const Option<string>& query,
const Option<hashmap<string, string>>& headers)
{
- URL url("http", net::IP(upid.node.ip), upid.node.port, upid.id);
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
@@ -320,7 +321,7 @@ Future<Response> post(
const Option<string>& body,
const Option<string>& contentType)
{
- URL url("http", net::IP(upid.node.ip), upid.node.port, upid.id);
+ URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/pid.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp
index 085e0b9..d28a154 100644
--- a/3rdparty/libprocess/src/pid.cpp
+++ b/3rdparty/libprocess/src/pid.cpp
@@ -48,7 +48,7 @@ UPID::UPID(const std::string& s)
UPID::UPID(const ProcessBase& process)
{
id = process.self().id;
- node = process.self().node;
+ address = process.self().address;
}
@@ -62,7 +62,7 @@ UPID::operator std::string() const
ostream& operator << (ostream& stream, const UPID& pid)
{
- stream << pid.id << "@" << pid.node;
+ stream << pid.id << "@" << pid.address;
return stream;
}
@@ -70,8 +70,8 @@ ostream& operator << (ostream& stream, const UPID& pid)
istream& operator >> (istream& stream, UPID& pid)
{
pid.id = "";
- pid.node.ip = 0;
- pid.node.port = 0;
+ pid.address.ip = 0;
+ pid.address.port = 0;
string str;
if (!(stream >> str)) {
@@ -88,7 +88,7 @@ istream& operator >> (istream& stream, UPID& pid)
string id;
string host;
- Node node;
+ network::Address address;
size_t index = str.find('@');
@@ -119,17 +119,17 @@ istream& operator >> (istream& stream, UPID& pid)
return stream;
}
- node.ip = ip.get();
+ address.ip = ip.get();
str = str.substr(index + 1);
- if (sscanf(str.c_str(), "%hu", &node.port) != 1) {
+ if (sscanf(str.c_str(), "%hu", &address.port) != 1) {
stream.setstate(std::ios_base::badbit);
return stream;
}
pid.id = id;
- pid.node = node;
+ pid.address = address;
return stream;
}
@@ -139,8 +139,8 @@ size_t hash_value(const UPID& pid)
{
size_t seed = 0;
boost::hash_combine(seed, pid.id);
- boost::hash_combine(seed, pid.node.ip);
- boost::hash_combine(seed, pid.node.port);
+ boost::hash_combine(seed, pid.address.ip);
+ boost::hash_combine(seed, pid.address.port);
return seed;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index eaeabd7..ec781a3 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -101,9 +101,9 @@ Future<Nothing> connect(const Socket& socket)
} // namespace internal {
-Future<Nothing> PollSocketImpl::connect(const Node& node)
+Future<Nothing> PollSocketImpl::connect(const Address& address)
{
- Try<int> connect = network::connect(get(), node);
+ Try<int> connect = network::connect(get(), address);
if (connect.isError()) {
if (errno == EINPROGRESS) {
return io::poll(get(), io::WRITE)
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
index f7ca08e..553aa64 100644
--- a/3rdparty/libprocess/src/poll_socket.hpp
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -19,7 +19,7 @@ public:
// Implementation of the Socket::Impl interface.
virtual Try<Nothing> listen(int backlog);
virtual Future<Socket> accept();
- virtual Future<Nothing> connect(const Node& node);
+ virtual Future<Nothing> connect(const Address& address);
virtual Future<size_t> recv(char* data, size_t size);
virtual Future<size_t> send(const char* data, size_t size);
virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 67b6b3b..a1d9121 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -40,6 +40,7 @@
#include <stdexcept>
#include <vector>
+#include <process/address.hpp>
#include <process/check.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
@@ -54,7 +55,6 @@
#include <process/io.hpp>
#include <process/logging.hpp>
#include <process/mime.hpp>
-#include <process/node.hpp>
#include <process/process.hpp>
#include <process/profiler.hpp>
#include <process/socket.hpp>
@@ -96,6 +96,8 @@ using process::http::OK;
using process::http::Request;
using process::http::Response;
using process::http::ServiceUnavailable;
+
+using process::network::Address;
using process::network::Socket;
using std::deque;
@@ -283,7 +285,7 @@ public:
void close(int s);
- void exited(const Node& node);
+ void exited(const Address& address);
void exited(ProcessBase* process);
private:
@@ -293,11 +295,12 @@ private:
{
// For links, we maintain a bidirectional mapping between the
// "linkers" (Processes) and the "linkees" (remote / local UPIDs).
- // For remote nodes, we also need a mapping to the linkees on the
- // node, because socket closure only notifies at the node level.
+ // For remote socket addresses, we also need a mapping to the
+ // linkees for that socket address, because socket closure only
+ // notifies at the address level.
hashmap<UPID, hashset<ProcessBase*>> linkers;
hashmap<ProcessBase*, hashset<UPID>> linkees;
- hashmap<Node, hashset<UPID>> remotes;
+ hashmap<Address, hashset<UPID>> remotes;
} links;
// Collection of all actice sockets.
@@ -308,19 +311,19 @@ private:
// them).
set<int> dispose;
- // Map from socket to node (ip, port).
- map<int, Node> nodes;
+ // Map from socket to socket address (ip, port).
+ map<int, Address> addresses;
- // Maps from node (ip, port) to temporary sockets (i.e., they will
- // get closed once there is no more data to send on them).
- map<Node, int> temps;
+ // Maps from socket address (ip, port) to temporary sockets (i.e.,
+ // they will get closed once there is no more data to send on them).
+ map<Address, int> temps;
- // Maps from node (ip, port) to persistent sockets (i.e., they will
+ // Maps from socket address (ip, port) to persistent sockets (i.e., they will
// remain open even if there is no more data to send on them). We
// distinguish these from the 'temps' collection so we can tell when
// a persistant socket has been lost (and thus generate
// ExitedEvents).
- map<Node, int> persists;
+ map<Address, int> persists;
// Map from socket to outgoing queue.
map<int, queue<Encoder*> > outgoing;
@@ -449,8 +452,8 @@ static const int LISTEN_BACKLOG = 500000;
// Local server socket.
static Socket* __s__ = NULL;
-// Local node.
-static Node __node__;
+// Local socket address.
+static Address __address__;
// Active SocketManager (eventually will probably be thread-local).
static SocketManager* socket_manager = NULL;
@@ -508,7 +511,7 @@ static Message* encode(const UPID& from,
static void transport(Message* message, ProcessBase* sender = NULL)
{
- if (message->to.node == __node__) {
+ if (message->to.address == __address__) {
// Local message.
process_manager->deliver(message->to, new MessageEvent(message), sender);
} else {
@@ -565,7 +568,7 @@ static Message* parse(Request* request)
return NULL;
}
- const UPID to(decode.get(), __node__);
+ const UPID to(decode.get(), __address__);
// And now determine 'name'.
index = index != string::npos ? index + 2: request->path.size();
@@ -821,15 +824,15 @@ void initialize(const string& delegate)
LOG(FATAL) << "Failed to initialize, pthread_create";
}
- __node__.ip = 0;
- __node__.port = 0;
+ __address__.ip = 0;
+ __address__.port = 0;
char* value;
// Check environment for ip.
value = getenv("LIBPROCESS_IP");
if (value != NULL) {
- int result = inet_pton(AF_INET, value, &__node__.ip);
+ int result = inet_pton(AF_INET, value, &__address__.ip);
if (result == 0) {
LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
} else if (result < 0) {
@@ -844,10 +847,10 @@ void initialize(const string& delegate)
if (result < 0 || result > USHRT_MAX) {
LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
}
- __node__.port = result;
+ __address__.port = result;
}
- // Create a "server" socket for communicating with other nodes.
+ // Create a "server" socket for communicating.
Try<Socket> create = Socket::create();
if (create.isError()) {
PLOG(FATAL) << "Failed to construct server socket:" << create.error();
@@ -860,18 +863,18 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
- Try<Node> bind = __s__->bind(__node__);
+ Try<Address> bind = __s__->bind(__address__);
if (bind.isError()) {
PLOG(FATAL) << "Failed to initialize: " << bind.error();
}
- __node__ = bind.get();
+ __address__ = bind.get();
// Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
// actually have a valid external ip address. Note that we need only
// one ip address, so that other processes can send and receive and
// don't get confused as to whom they are sending to.
- if (__node__.ip == 0 || __node__.ip == 2130706433) {
+ if (__address__.ip == 0 || __address__.ip == 2130706433) {
char hostname[512];
if (gethostname(hostname, sizeof(hostname)) < 0) {
@@ -886,7 +889,7 @@ void initialize(const string& delegate)
LOG(FATAL) << ip.error();
}
- __node__.ip = ip.get();
+ __address__.ip = ip.get();
}
Try<Nothing> listen = __s__->listen(LISTEN_BACKLOG);
@@ -955,7 +958,7 @@ void initialize(const string& delegate)
new Route("/__processes__", None(), __processes__);
- VLOG(1) << "libprocess is initialized on " << node() << " for " << cpus
+ VLOG(1) << "libprocess is initialized on " << address() << " for " << cpus
<< " cpus";
}
@@ -969,10 +972,10 @@ void finalize()
}
-Node node()
+Address address()
{
process::initialize();
- return __node__;
+ return __address__;
}
@@ -1328,8 +1331,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
bool connect = false;
synchronized (this) {
- // Check if node is remote and there isn't a persistant link.
- if (to.node != __node__ && persists.count(to.node) == 0) {
+ // Check if the socket address is remote and there isn't a persistant link.
+ if (to.address != __address__ && persists.count(to.address) == 0) {
// Okay, no link, let's create a socket.
Try<Socket> create = Socket::create();
if (create.isError()) {
@@ -1340,9 +1343,9 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
int s = socket.get().get();
sockets[s] = new Socket(socket.get());
- nodes[s] = to.node;
+ addresses[s] = to.address;
- persists[to.node] = s;
+ persists[to.address] = s;
// Initialize 'outgoing' to prevent a race with
// SocketManager::send() while the socket is not yet connected.
@@ -1356,14 +1359,14 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
links.linkers[to].insert(process);
links.linkees[process].insert(to);
- if (to.node != __node__) {
- links.remotes[to.node].insert(to);
+ if (to.address != __address__) {
+ links.remotes[to.address].insert(to);
}
}
if (connect) {
CHECK_SOME(socket);
- Socket(socket.get()).connect(to.node) // Copy to drop const.
+ Socket(socket.get()).connect(to.address) // Copy to drop const.
.onAny(lambda::bind(
&internal::link_connect,
lambda::_1,
@@ -1574,17 +1577,17 @@ void SocketManager::send(Message* message)
{
CHECK(message != NULL);
- const Node& node = message->to.node;
+ const Address& address = message->to.address;
Option<Socket> socket = None();
bool connect = false;
synchronized (this) {
// Check if there is already a socket.
- bool persist = persists.count(node) > 0;
- bool temp = temps.count(node) > 0;
+ bool persist = persists.count(address) > 0;
+ bool temp = temps.count(address) > 0;
if (persist || temp) {
- int s = persist ? persists[node] : temps[node];
+ int s = persist ? persists[address] : temps[address];
CHECK(sockets.count(s) > 0);
socket = *sockets[s];
@@ -1603,8 +1606,8 @@ void SocketManager::send(Message* message)
}
} else {
- // No peristent or temporary socket to the node currently
- // exists, so we create a temporary one.
+ // No peristent or temporary socket to the socket address
+ // currently exists, so we create a temporary one.
Try<Socket> create = Socket::create();
if (create.isError()) {
VLOG(1) << "Failed to send, create socket: " << create.error();
@@ -1615,8 +1618,8 @@ void SocketManager::send(Message* message)
int s = socket.get();
sockets[s] = new Socket(socket.get());
- nodes[s] = node;
- temps[node] = s;
+ addresses[s] = address;
+ temps[address] = s;
dispose.insert(s);
@@ -1629,7 +1632,7 @@ void SocketManager::send(Message* message)
if (connect) {
CHECK_SOME(socket);
- Socket(socket.get()).connect(node) // Copy to drop const.
+ Socket(socket.get()).connect(address) // Copy to drop const.
.onAny(lambda::bind(
&internal::send_connect,
lambda::_1,
@@ -1679,11 +1682,11 @@ Encoder* SocketManager::next(int s)
// This is either a temporary socket we created or it's a
// socket that we were receiving data from and possibly
// sending HTTP responses back on. Clean up either way.
- if (nodes.count(s) > 0) {
- const Node& node = nodes[s];
- CHECK(temps.count(node) > 0 && temps[node] == s);
- temps.erase(node);
- nodes.erase(s);
+ if (addresses.count(s) > 0) {
+ const Address& address = addresses[s];
+ CHECK(temps.count(address) > 0 && temps[address] == s);
+ temps.erase(address);
+ addresses.erase(s);
}
if (proxies.count(s) > 0) {
@@ -1739,19 +1742,19 @@ void SocketManager::close(int s)
outgoing.erase(s);
}
- // Clean up after sockets used for node communication.
- if (nodes.count(s) > 0) {
- const Node& node = nodes[s];
+ // Clean up after sockets used for remote communication.
+ if (addresses.count(s) > 0) {
+ const Address& address = addresses[s];
// Don't bother invoking exited unless socket was persistant.
- if (persists.count(node) > 0 && persists[node] == s) {
- persists.erase(node);
- exited(node); // Generate ExitedEvent(s)!
- } else if (temps.count(node) > 0 && temps[node] == s) {
- temps.erase(node);
+ if (persists.count(address) > 0 && persists[address] == s) {
+ persists.erase(address);
+ exited(address); // Generate ExitedEvent(s)!
+ } else if (temps.count(address) > 0 && temps[address] == s) {
+ temps.erase(address);
}
- nodes.erase(s);
+ addresses.erase(s);
}
// Clean up any proxy associated with this socket.
@@ -1803,18 +1806,18 @@ void SocketManager::close(int s)
}
-void SocketManager::exited(const Node& node)
+void SocketManager::exited(const Address& address)
{
// TODO(benh): It would be cleaner if this routine could call back
// into ProcessManager ... then we wouldn't have to convince
// ourselves that the accesses to each Process object will always be
// valid.
synchronized (this) {
- if (!links.remotes.contains(node)) {
- return; // No linkees for this node!
+ if (!links.remotes.contains(address)) {
+ return; // No linkees for this socket address!
}
- foreach (const UPID& linkee, links.remotes[node]) {
+ foreach (const UPID& linkee, links.remotes[address]) {
// Find and notify the linkers.
CHECK(links.linkers.contains(linkee));
@@ -1833,7 +1836,7 @@ void SocketManager::exited(const Node& node)
links.linkers.erase(linkee);
}
- links.remotes.erase(node);
+ links.remotes.erase(address);
}
}
@@ -1865,12 +1868,12 @@ void SocketManager::exited(ProcessBase* process)
// The exited process was the last linker for this linkee,
// so we need to remove the linkee from the remotes.
- if (linkee.node != __node__) {
- CHECK(links.remotes.contains(linkee.node));
+ if (linkee.address != __address__) {
+ CHECK(links.remotes.contains(linkee.address));
- links.remotes[linkee.node].erase(linkee);
- if (links.remotes[linkee.node].empty()) {
- links.remotes.erase(linkee.node);
+ links.remotes[linkee.address].erase(linkee);
+ if (links.remotes[linkee.address].empty()) {
+ links.remotes.erase(linkee.address);
}
}
}
@@ -1932,7 +1935,7 @@ ProcessManager::~ProcessManager()
ProcessReference ProcessManager::use(const UPID& pid)
{
- if (pid.node == __node__) {
+ if (pid.address == __address__) {
synchronized (processes) {
if (processes.count(pid.id) > 0) {
// Note that the ProcessReference constructor _must_ get
@@ -2038,12 +2041,12 @@ bool ProcessManager::handle(
if (tokens.size() == 0 && delegate != "") {
request->path = "/" + delegate;
- receiver = use(UPID(delegate, __node__));
+ receiver = use(UPID(delegate, __address__));
} else if (tokens.size() > 0) {
// Decode possible percent-encoded path.
Try<string> decode = http::decode(tokens[0]);
if (!decode.isError()) {
- receiver = use(UPID(decode.get(), __node__));
+ receiver = use(UPID(decode.get(), __address__));
} else {
VLOG(1) << "Failed to decode URL path: " << decode.error();
}
@@ -2052,7 +2055,7 @@ bool ProcessManager::handle(
if (!receiver && delegate != "") {
// Try and delegate the request.
request->path = "/" + delegate + request->path;
- receiver = use(UPID(delegate, __node__));
+ receiver = use(UPID(delegate, __address__));
}
if (receiver) {
@@ -2363,7 +2366,7 @@ void ProcessManager::cleanup(ProcessBase* process)
void ProcessManager::link(ProcessBase* process, const UPID& to)
{
// Check if the pid is local.
- if (to.node != __node__) {
+ if (to.address != __address__) {
socket_manager->link(process, to);
} else {
// Since the pid is local we want to get a reference to it's
@@ -2665,7 +2668,7 @@ ProcessBase::ProcessBase(const string& id)
refs = 0;
pid.id = id != "" ? id : ID::generate();
- pid.node = __node__;
+ pid.address = __address__;
// If using a manual clock, try and set current time of process
// using happens before relationship between creator (__process__)
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index fb5a5ab..0e1cebb 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -71,15 +71,23 @@ const Socket::Kind& Socket::DEFAULT_KIND()
}
-Try<Node> Socket::Impl::bind(const Node& node)
+Try<Address> Socket::Impl::address() const
{
- Try<int> bind = network::bind(get(), node);
+ // TODO(benh): Cache this result so that we don't have to make
+ // unnecessary system calls each time.
+ return network::address(get());
+}
+
+
+Try<Address> Socket::Impl::bind(const Address& address)
+{
+ Try<int> bind = network::bind(get(), address);
if (bind.isError()) {
return Error(bind.error());
}
// Lookup and store assigned IP and assigned port.
- return network::getsockname(get(), AF_INET);
+ return network::address(get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp
index b286259..a927e4e 100644
--- a/3rdparty/libprocess/src/tests/benchmarks.cpp
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -108,9 +108,9 @@ public:
private:
void ping(const UPID& from, const string& body)
{
- if (linkedPorts.find(from.node.port) == linkedPorts.end()) {
+ if (linkedPorts.find(from.address.port) == linkedPorts.end()) {
setLink(from);
- linkedPorts.insert(from.node.port);
+ linkedPorts.insert(from.address.port);
}
static const string message("hi");
send(from, "pong", message.c_str(), message.size());
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index aaa1646..0a00ae5 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -123,7 +123,7 @@ TEST(HTTP, Endpoints)
Socket socket = create.get();
- AWAIT_READY(socket.connect(process.self().node));
+ AWAIT_READY(socket.connect(process.self().address));
std::ostringstream out;
out << "GET /" << process.self().id << "/body"
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index f9b2dc0..8ce1e6e 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -147,7 +147,7 @@ TEST(Metrics, Snapshot)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- UPID upid("metrics", process::node());
+ UPID upid("metrics", process::address());
Clock::pause();
@@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- UPID upid("metrics", process::node());
+ UPID upid("metrics", process::address());
Clock::pause();
@@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout)
// Ensures that the aggregate statistics are correct in the snapshot.
TEST(Metrics, SnapshotStatistics)
{
- UPID upid("metrics", process::node());
+ UPID upid("metrics", process::address());
Clock::pause();
http://git-wip-us.apache.org/repos/asf/mesos/blob/257bd1ad/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 1bcb3c6..9f9220e 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -41,6 +41,7 @@
using namespace process;
+using process::network::Address;
using process::network::Socket;
using std::string;
@@ -1468,7 +1469,7 @@ TEST(Process, remote)
Socket socket = create.get();
- AWAIT_READY(socket.connect(process.self().node));
+ AWAIT_READY(socket.connect(process.self().address));
Message message;
message.name = "handler";
@@ -1531,14 +1532,14 @@ TEST(Process, http2)
Socket socket = create.get();
- ASSERT_SOME(socket.bind(Node()));
+ ASSERT_SOME(socket.bind(Address()));
// Create a UPID for 'Libprocess-From' based on the IP and port we
// got assigned.
- Try<Node> node = network::getsockname(socket.get(), AF_INET);
- ASSERT_SOME(node);
+ Try<Address> address = socket.address();
+ ASSERT_SOME(address);
- UPID from("", node.get());
+ UPID from("", address.get());
ASSERT_SOME(socket.listen(1));
@@ -1869,7 +1870,7 @@ TEST(Process, PercentEncodedURLs)
spawn(process);
// Construct the PID using percent-encoding.
- UPID pid("id%2842%29", process.self().node);
+ UPID pid("id%2842%29", process.self().address);
// Mimic a libprocess message sent to an installed handler.
Future<Nothing> handler1;
[04/20] mesos git commit: Replaced network::socket with
network::Socket.
Posted by be...@apache.org.
Replaced network::socket with network::Socket.
Review: https://reviews.apache.org/r/29526
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b488501
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b488501
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b488501
Branch: refs/heads/master
Commit: 8b48850157b106acec23c2cb189694808975f0b5
Parents: e93672b
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Dec 25 17:06:47 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:45 2015 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/tests/http_tests.cpp | 19 ++++-----
3rdparty/libprocess/src/tests/process_tests.cpp | 43 ++++++++++----------
2 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b488501/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index f7084ef..83ca10e 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -24,6 +24,8 @@
using namespace process;
+using process::network::Socket;
+
using std::string;
using testing::_;
@@ -33,7 +35,6 @@ using testing::EndsWith;
using testing::Invoke;
using testing::Return;
-
class HttpProcess : public Process<HttpProcess>
{
public:
@@ -114,13 +115,12 @@ TEST(HTTP, Endpoints)
spawn(process);
// First hit '/body' (using explicit sockets and HTTP/1.0).
- Try<int> socket = network::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
-
- ASSERT_TRUE(socket.isSome());
+ Try<Socket> create = Socket::create();
+ ASSERT_SOME(create);
- int s = socket.get();
+ Socket socket = create.get();
- ASSERT_TRUE(network::connect(s, process.self().node).isSome());
+ AWAIT_READY(socket.connect(process.self().node));
std::ostringstream out;
out << "GET /" << process.self().id << "/body"
@@ -133,15 +133,14 @@ TEST(HTTP, Endpoints)
EXPECT_CALL(process, body(_))
.WillOnce(Return(http::OK()));
- ASSERT_SOME(os::write(s, data));
+ AWAIT_EXPECT_EQ(data.size(), socket.send(data.data(), data.size()));
string response = "HTTP/1.1 200 OK";
char temp[response.size()];
- ASSERT_LT(0, ::read(s, temp, response.size()));
- ASSERT_EQ(response, string(temp, response.size()));
- ASSERT_EQ(0, close(s));
+ AWAIT_EXPECT_EQ(response.size(), socket.recv(temp, response.size()));
+ ASSERT_EQ(response, string(temp, response.size()));
// Now hit '/pipe' (by using http::get).
int pipes[2];
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b488501/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index bf2768f..a4bf28c 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -40,6 +40,8 @@
using namespace process;
+using process::network::Socket;
+
using std::string;
using testing::_;
@@ -1418,13 +1420,12 @@ TEST(Process, remote)
EXPECT_CALL(process, handler(_, _))
.WillOnce(FutureSatisfy(&handler));
- Try<int> socket = network::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
-
- ASSERT_TRUE(socket.isSome());
+ Try<Socket> create = Socket::create();
+ ASSERT_SOME(create);
- int s = socket.get();
+ Socket socket = create.get();
- ASSERT_TRUE(network::connect(s, process.self().node).isSome());
+ AWAIT_READY(socket.connect(process.self().node));
Message message;
message.name = "handler";
@@ -1433,9 +1434,7 @@ TEST(Process, remote)
const string& data = MessageEncoder::encode(&message);
- ASSERT_EQ(data.size(), write(s, data.data(), data.size()));
-
- ASSERT_EQ(0, close(s));
+ AWAIT_EXPECT_EQ(data.size(), socket.send(data.data(), data.size()));
AWAIT_READY(handler);
@@ -1484,21 +1483,21 @@ TEST(Process, http2)
spawn(process);
// Create a receiving socket so we can get messages back.
- Try<int> socket = network::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
- ASSERT_TRUE(socket.isSome());
+ Try<Socket> create = Socket::create();
+ ASSERT_SOME(create);
- int s = socket.get();
+ Socket socket = create.get();
- ASSERT_TRUE(network::bind(s, Node()).isSome());
+ ASSERT_SOME(socket.bind(Node()));
// Create a UPID for 'Libprocess-From' based on the IP and port we
// got assigned.
- Try<Node> node = network::getsockname(s, AF_INET);
- ASSERT_TRUE(node.isSome());
+ Try<Node> node = network::getsockname(socket.get(), AF_INET);
+ ASSERT_SOME(node);
UPID from("", node.get());
- ASSERT_EQ(0, listen(s, 1));
+ ASSERT_SOME(socket.listen(1));
Future<UPID> pid;
Future<string> body;
@@ -1526,17 +1525,17 @@ TEST(Process, http2)
post(process.self(), from, name);
// Accept the incoming connection.
- Try<int> accepted = network::accept(s, AF_INET);
- ASSERT_TRUE(accepted.isSome());
+ Future<Socket> accept = socket.accept();
+ AWAIT_READY(accept);
- int c = accepted.get();
- ASSERT_LT(0, c);
+ Socket client = accept.get();
const string data = "POST /" + name + " HTTP/1.1";
- EXPECT_SOME_EQ(data, os::read(c, data.size()));
- close(c);
- close(s);
+ char temp[data.size()];
+
+ AWAIT_ASSERT_EQ(data.size(), client.recv(temp, data.size()));
+ ASSERT_EQ(data, string(temp, data.size()));
terminate(process);
wait(process);
[03/20] mesos git commit: Add initializer_list constructor to hashmap.
Posted by be...@apache.org.
Add initializer_list constructor to hashmap.
Simplified signature from the C++11 standard specification.
Review: https://reviews.apache.org/r/28252
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc9be770
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc9be770
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc9be770
Branch: refs/heads/master
Commit: dc9be770a71ebaaab1cfc2fc4c4a87846faed9bd
Parents: d4dd716
Author: Cody Maloney <co...@mesosphere.io>
Authored: Fri Jan 30 18:02:10 2015 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Feb 7 14:42:45 2015 -0800
----------------------------------------------------------------------
.../3rdparty/stout/include/stout/hashmap.hpp | 17 ++++++++++++++---
.../3rdparty/stout/tests/hashmap_tests.cpp | 16 ++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc9be770/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
index aa4d9ba..24dc369 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
@@ -39,6 +39,19 @@ public:
// 'const hashmap<T> map;' is not an error.
hashmap() {}
+ // Allow simple construction via initializer list.
+ hashmap(std::initializer_list<std::pair<Key, Value>> list)
+ {
+ boost::unordered_map<Key, Value>::reserve(list.size());
+
+ // TODO(cmaloney): Use 'foreach*' once supported.
+ auto it = list.begin();
+ while (it != list.end()) {
+ boost::unordered_map<Key, Value>::emplace(it->first, it->second);
+ ++it;
+ }
+ }
+
// Checks whether this map contains a binding for a key.
bool contains(const Key& key) const
{
@@ -67,9 +80,7 @@ public:
// Returns an Option for the binding to the key.
Option<Value> get(const Key& key) const
{
- typedef typename boost::unordered_map<Key, Value>::const_iterator
- const_iterator;
- const_iterator it = boost::unordered_map<Key, Value>::find(key);
+ auto it = boost::unordered_map<Key, Value>::find(key);
if (it == boost::unordered_map<Key, Value>::end()) {
return None();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc9be770/3rdparty/libprocess/3rdparty/stout/tests/hashmap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/hashmap_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/hashmap_tests.cpp
index eb3abfc..e8a932e 100644
--- a/3rdparty/libprocess/3rdparty/stout/tests/hashmap_tests.cpp
+++ b/3rdparty/libprocess/3rdparty/stout/tests/hashmap_tests.cpp
@@ -10,6 +10,22 @@
using std::string;
+TEST(HashMapTest, InitializerList)
+{
+ hashmap<string, int> map{{"hello", 1}};
+ EXPECT_EQ(1, map.size());
+
+ EXPECT_TRUE((hashmap<int, int>{}.empty()));
+
+ hashmap<int, int> map2{{1, 2}, {2, 3}, {3, 4}};
+ EXPECT_EQ(3, map2.size());
+ EXPECT_SOME_EQ(2, map2.get(1));
+ EXPECT_SOME_EQ(3, map2.get(2));
+ EXPECT_SOME_EQ(4, map2.get(3));
+ EXPECT_NONE(map2.get(4));
+}
+
+
TEST(HashMapTest, Insert)
{
hashmap<string, int> map;