You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/04/24 22:37:41 UTC
svn commit: r1471657 - in /incubator/mesos/trunk/third_party/libprocess:
include/process/socket.hpp src/encoder.hpp src/process.cpp
Author: benh
Date: Wed Apr 24 20:37:41 2013
New Revision: 1471657
URL: http://svn.apache.org/r1471657
Log:
Added a Socket reference to Encoder.
Review: https://reviews.apache.org/r/10742
Modified:
incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp
incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp?rev=1471657&r1=1471656&r2=1471657&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/socket.hpp Wed Apr 24 20:37:41 2013
@@ -15,7 +15,7 @@ public:
Socket()
: refs(new int(1)), s(-1) {}
- Socket(int _s)
+ explicit Socket(int _s)
: refs(new int(1)), s(_s) {}
~Socket()
@@ -62,7 +62,10 @@ private:
if (__sync_sub_and_fetch(refs, 1) == 0) {
delete refs;
if (s >= 0) {
- close(s);
+ if (close(s) != 0) {
+ perror("Failed to close socket");
+ abort();
+ }
}
}
}
Modified: incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp?rev=1471657&r1=1471656&r2=1471657&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp Wed Apr 24 20:37:41 2013
@@ -28,16 +28,26 @@ extern void send_file(struct ev_loop*, e
class Encoder
{
public:
+ Encoder(const Socket& _s) : s(_s) {}
virtual ~Encoder() {}
+
virtual Sender sender() = 0;
+
+ Socket socket() const
+ {
+ return s;
+ }
+
+private:
+ const Socket s; // The socket this encoder is associated with.
};
class DataEncoder : public Encoder
{
public:
- DataEncoder(const std::string& _data)
- : data(_data), index(0) {}
+ DataEncoder(const Socket& s, const std::string& _data)
+ : Encoder(s), data(_data), index(0) {}
virtual ~DataEncoder() {}
@@ -75,8 +85,8 @@ private:
class MessageEncoder : public DataEncoder
{
public:
- MessageEncoder(Message* _message)
- : DataEncoder(encode(_message)), message(_message) {}
+ MessageEncoder(const Socket& s, Message* _message)
+ : DataEncoder(s, encode(_message)), message(_message) {}
virtual ~MessageEncoder()
{
@@ -119,9 +129,10 @@ class HttpResponseEncoder : public DataE
{
public:
HttpResponseEncoder(
+ const Socket& s,
const http::Response& response,
const http::Request& request)
- : DataEncoder(encode(response, request)) {}
+ : DataEncoder(s, encode(response, request)) {}
static std::string encode(
const http::Response& response,
@@ -202,8 +213,8 @@ public:
class FileEncoder : public Encoder
{
public:
- FileEncoder(int _fd, size_t _size)
- : fd(_fd), size(_size), index(0) {}
+ FileEncoder(const Socket& s, int _fd, size_t _size)
+ : Encoder(s), fd(_fd), size(_size), index(0) {}
virtual ~FileEncoder()
{
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1471657&r1=1471656&r2=1471657&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Wed Apr 24 20:37:41 2013
@@ -310,12 +310,12 @@ public:
void link(ProcessBase* process, const UPID& to);
- PID<HttpProxy> proxy(int s);
+ PID<HttpProxy> proxy(const Socket& socket);
- void send(Encoder* encoder, int s, bool persist);
+ void send(Encoder* encoder, bool persist);
void send(const Response& response,
const Request& request,
- int s);
+ const Socket& socket);
void send(Message* message);
Encoder* next(int s);
@@ -1560,12 +1560,14 @@ bool HttpProxy::process(const Future<Res
// TODO(benh): Consider a way to have the socket manager turn
// on TCP_CORK for both sends and then turn it off.
- Encoder* encoder = new HttpResponseEncoder(response, request);
- socket_manager->send(encoder, socket, true);
+ socket_manager->send(
+ new HttpResponseEncoder(socket, response, request),
+ true);
// Note the file descriptor gets closed by FileEncoder.
- encoder = new FileEncoder(fd, s.st_size);
- socket_manager->send(encoder, socket, request.keepAlive);
+ socket_manager->send(
+ new FileEncoder(socket, fd, s.st_size),
+ request.keepAlive);
}
}
} else if (response.type == Response::PIPE) {
@@ -1588,8 +1590,9 @@ bool HttpProxy::process(const Future<Res
VLOG(1) << "Starting \"chunked\" streaming";
- Encoder* encoder = new HttpResponseEncoder(response, request);
- socket_manager->send(encoder, socket, true);
+ socket_manager->send(
+ new HttpResponseEncoder(socket, response, request),
+ true);
pipe = response.pipe;
@@ -1649,9 +1652,8 @@ void HttpProxy::stream(const Future<shor
// We always persist the connection when we're not finished
// streaming.
socket_manager->send(
- new DataEncoder(out.str()),
- socket,
- finished ? request.keepAlive : true);
+ new DataEncoder(socket, out.str()),
+ finished ? request.keepAlive : true);
}
}
} else if (poll.isFailed()) {
@@ -1724,28 +1726,26 @@ void SocketManager::link(ProcessBase* pr
LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
}
- Socket socket = Socket(s);
-
- sockets[s] = socket;
+ sockets[s] = Socket(s);
nodes[s] = node;
persists[node] = s;
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(to.port);
- addr.sin_addr.s_addr = to.ip;
-
// Allocate and initialize the decoder and watcher (we really
// only "receive" on this socket so that we can react when it
// gets closed and generate appropriate lost events).
- DataDecoder* decoder = new DataDecoder(socket);
+ DataDecoder* decoder = new DataDecoder(sockets[s]);
ev_io* watcher = new ev_io();
watcher->data = decoder;
// Try and connect to the node using this socket.
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_port = htons(to.port);
+ addr.sin_addr.s_addr = to.ip;
+
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
PLOG(FATAL) << "Failed to link, connect";
@@ -1771,7 +1771,7 @@ void SocketManager::link(ProcessBase* pr
}
-PID<HttpProxy> SocketManager::proxy(int s)
+PID<HttpProxy> SocketManager::proxy(const Socket& socket)
{
HttpProxy* proxy = NULL;
@@ -1779,12 +1779,12 @@ PID<HttpProxy> SocketManager::proxy(int
// This socket might have been asked to get closed (e.g., remote
// side hang up) while a process is attempting to handle an HTTP
// request. Thus, if there is no more socket, return an empty PID.
- if (sockets.count(s) > 0) {
- if (proxies.count(s) > 0) {
- return proxies[s]->self();
+ if (sockets.count(socket) > 0) {
+ if (proxies.count(socket) > 0) {
+ return proxies[socket]->self();
} else {
- proxy = new HttpProxy(sockets[s]);
- proxies[s] = proxy;
+ proxy = new HttpProxy(sockets[socket]);
+ proxies[socket] = proxy;
}
}
}
@@ -1803,29 +1803,29 @@ PID<HttpProxy> SocketManager::proxy(int
}
-void SocketManager::send(Encoder* encoder, int s, bool persist)
+void SocketManager::send(Encoder* encoder, bool persist)
{
CHECK(encoder != NULL);
synchronized (this) {
- if (sockets.count(s) > 0) {
+ if (sockets.count(encoder->socket()) > 0) {
// Update whether or not this socket should get disposed after
// there is no more data to send.
if (!persist) {
- dispose.insert(s);
+ dispose.insert(encoder->socket());
}
- if (outgoing.count(s) > 0) {
- outgoing[s].push(encoder);
+ if (outgoing.count(encoder->socket()) > 0) {
+ outgoing[encoder->socket()].push(encoder);
} else {
// Initialize the outgoing queue.
- outgoing[s];
+ outgoing[encoder->socket()];
// Allocate and initialize the watcher.
ev_io* watcher = new ev_io();
watcher->data = encoder;
- ev_io_init(watcher, encoder->sender(), s, EV_WRITE);
+ ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
synchronized (watchers) {
watchers->push(watcher);
@@ -1844,7 +1844,7 @@ void SocketManager::send(Encoder* encode
void SocketManager::send(
const Response& response,
const Request& request,
- int s)
+ const Socket& socket)
{
bool persist = request.keepAlive;
@@ -1856,7 +1856,7 @@ void SocketManager::send(
}
}
- send(new HttpResponseEncoder(response, request), s, persist);
+ send(new HttpResponseEncoder(socket, response, request), persist);
}
@@ -1864,8 +1864,6 @@ void SocketManager::send(Message* messag
{
CHECK(message != NULL);
- DataEncoder* encoder = new MessageEncoder(message);
-
Node node(message->to.ip, message->to.port);
synchronized (this) {
@@ -1874,7 +1872,8 @@ void SocketManager::send(Message* messag
bool temp = temps.count(node) > 0;
if (persist || temp) {
int s = persist ? persists[node] : temps[node];
- send(encoder, s, persist);
+ CHECK(sockets.count(s) > 0);
+ send(new MessageEncoder(sockets[s], message), persist);
} else {
// No peristant or temporary socket to the node currently
// exists, so we create a temporary one.
@@ -1903,6 +1902,10 @@ void SocketManager::send(Message* messag
// Initialize the outgoing queue.
outgoing[s];
+ // Allocate and initialize the watcher.
+ ev_io* watcher = new ev_io();
+ watcher->data = new MessageEncoder(sockets[s], message);
+
// Try and connect to the node using this socket.
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
@@ -1910,10 +1913,6 @@ void SocketManager::send(Message* messag
addr.sin_port = htons(message->to.port);
addr.sin_addr.s_addr = message->to.ip;
- // Allocate and initialize the watcher.
- ev_io* watcher = new ev_io();
- watcher->data = encoder;
-
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
if (errno != EINPROGRESS) {
PLOG(FATAL) << "Failed to send, connect";
@@ -3153,9 +3152,9 @@ void read(int fd,
return;
}
- // Since promise->future() will be discarded before future is discarded, we
- // should never see a discarded future here because of the check in the
- // beginning of this function.
+ // Since promise->future() will be discarded before future is
+ // discarded, we should never see a discarded future here because of
+ // the check in the beginning of this function.
CHECK(!future.isDiscarded());
if (future.isFailed()) {
@@ -3218,7 +3217,7 @@ Future<size_t> read(int fd, void* data,
Try<bool> nonblock = os::isNonblock(fd);
if (nonblock.isError()) {
// The file descriptor is not valid (e.g. fd has been closed).
- promise->fail(strerror(errno));
+ promise->fail(string("Failed to check O_NONBLOCK") + strerror(errno));
return promise->future();
} else if (!nonblock.get()) {
// The fd is not opened with O_NONBLOCK set.
@@ -3254,8 +3253,7 @@ Future<string> _read(int fd,
return io::read(fd, data.get(), length)
.then([=] (size_t size) {
if (size == 0) { // EOF.
- string result(*buffer);
- return Future<string>(result);
+ return string(*buffer);
}
buffer->append(data, size);
return _read(fd, buffer, data, length);
@@ -3278,11 +3276,11 @@ Future<string> __read(
size_t length)
{
if (size == 0) { // EOF.
- string result(*buffer);
- return Future<string>(result);
+ return string(*buffer);
}
buffer->append(data.get(), size);
+
return _read(fd, buffer, data, length);
}
@@ -3367,7 +3365,8 @@ Future<Response> get(const UPID& upid, c
addr.sin_addr.s_addr = upid.ip;
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- return Future<Response>::failed(strerror(errno));
+ return Future<Response>::failed(
+ string("Failed to connect: ") + strerror(errno));
}
std::ostringstream out;
@@ -3388,7 +3387,8 @@ Future<Response> get(const UPID& upid, c
if (errno == EINTR) {
continue;
}
- return Future<Response>::failed(strerror(errno));
+ return Future<Response>::failed(
+ string("Failed to write: ") + strerror(errno));
}
remaining -= n;