You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/13 02:55:08 UTC
svn commit: r1397753 - in /incubator/mesos/trunk/third_party/libprocess/src:
process.cpp tests.cpp
Author: benh
Date: Sat Oct 13 00:55:08 2012
New Revision: 1397753
URL: http://svn.apache.org/viewvc?rev=1397753&view=rev
Log:
Added streaming HTTP responses (via pipes).
Review: https://reviews.apache.org/r/7544
Modified:
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1397753&r1=1397752&r2=1397753&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Sat Oct 13 00:55:08 2012
@@ -259,9 +259,17 @@ public:
void handle(Future<Response>* future, bool persist);
private:
+ // Starts "waiting" on the next available future response.
void next();
+
+ // Invoked once a future response has been satisfied.
void waited(const Future<Response>& future);
- void process(Future<Response>* future, bool persist);
+
+ // Demuxes and handles a response.
+ bool process(const Future<Response>& future, bool persist);
+
+ // Handles stream (i.e., pipe) based responses.
+ void stream(const Future<short>& poll, bool persist);
Socket socket; // Wrap the socket to keep it from getting closed.
@@ -282,6 +290,8 @@ private:
};
queue<Item*> items;
+
+ Option<int> pipe; // Current pipe, if streaming.
};
@@ -1415,8 +1425,27 @@ HttpProxy::HttpProxy(const Socket& _sock
HttpProxy::~HttpProxy()
{
+ // Need to make sure response producers know not to continue to
+ // create a response (streaming or otherwise).
+ if (pipe.isSome()) {
+ close(pipe.get());
+ }
+ pipe = Option<int>::none();
+
while (!items.empty()) {
Item* item = items.front();
+
+ // Attempt to discard the future.
+ item->future->discard();
+
+ // But it might have already been ready ...
+ if (item->future->isReady()) {
+ const Response& response = item->future->get();
+ if (response.type == Response::PIPE) {
+ close(response.pipe);
+ }
+ }
+
items.pop();
delete item;
}
@@ -1453,28 +1482,32 @@ void HttpProxy::waited(const Future<Resp
{
CHECK(items.size() > 0);
Item* item = items.front();
+
CHECK(future == *item->future);
- if (item->future->isReady()) {
- process(item->future, item->persist);
- } else {
- // TODO(benh): Consider handling other "states" of future
- // (discarded, failed, etc) with different HTTP statuses.
- socket_manager->send(ServiceUnavailable(), socket, item->persist);
- }
+ // Process the item and determine if we're done or not (so we know
+ // whether to start waiting on the next responses).
+ bool processed = process(*item->future, item->persist);
items.pop();
delete item;
- next();
+ if (processed) {
+ next();
+ }
}
-void HttpProxy::process(Future<Response>* future, bool persist)
+bool HttpProxy::process(const Future<Response>& future, bool persist)
{
- CHECK(future->isReady());
+ if (!future.isReady()) {
+ // TODO(benh): Consider handling other "states" of future
+ // (discarded, failed, etc) with different HTTP statuses.
+ socket_manager->send(ServiceUnavailable(), socket, persist);
+ return true; // All done, can process next response.
+ }
- Response response = future->get();
+ Response response = future.get();
// Don't persist connection if headers include 'Connection: close'.
if (response.headers.count("Connection") > 0) {
@@ -1512,14 +1545,14 @@ void HttpProxy::process(Future<Response>
socket_manager->send(NotFound(), socket, persist);
} else {
// While the user is expected to properly set a 'Content-Type'
- // header, we fill in the 'Content-Length' header.
+ // header, we fill in (or overwrite) 'Content-Length' header.
stringstream out;
out << s.st_size;
response.headers["Content-Length"] = out.str();
if (s.st_size == 0) {
socket_manager->send(response, socket, persist);
- return;
+ return true; // All done, can process next request.
}
VLOG(1) << "Sending file at '" << path << "' with length " << s.st_size;
@@ -1533,9 +1566,100 @@ void HttpProxy::process(Future<Response>
socket_manager->send(encoder, socket, persist);
}
}
+ } else if (response.type == Response::PIPE) {
+ // Make sure no body is sent (this is really an error and
+ // should be reported and no response sent.
+ response.body.clear();
+
+ // Make sure the pipe is nonblocking.
+ Try<Nothing> nonblock = os::nonblock(response.pipe);
+ if (nonblock.isError()) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed make pipe nonblocking: " << error;
+ socket_manager->send(InternalServerError(), socket, persist);
+ return true; // All done, can process next response.
+ }
+
+ // While the user is expected to properly set a 'Content-Type'
+ // header, we fill in (or overwrite) 'Transfer-Encoding' header.
+ response.headers["Transfer-Encoding"] = "chunked";
+
+ VLOG(1) << "Starting \"chunked\" streaming";
+
+ socket_manager->send(response, socket, true);
+
+ pipe = response.pipe;
+
+ io::poll(pipe.get(), io::READ).onAny(
+ defer(self(), &Self::stream, lambda::_1, persist));
+
+ return false; // Streaming, don't process next response (yet)!
} else {
socket_manager->send(response, socket, persist);
}
+
+ return true; // All done, can process next response.
+}
+
+
+void HttpProxy::stream(const Future<short>& poll, bool persist)
+{
+ // TODO(benh): Use 'splice' on Linux.
+
+ CHECK(pipe.isSome());
+
+ bool finished = false; // Whether or not we're done streaming.
+
+ if (poll.isReady()) {
+ // Read and write.
+ CHECK(poll.get() == io::READ);
+ const size_t size = 4 * 1024; // 4K.
+ char data[size];
+ while (!finished) {
+ ssize_t length = ::read(pipe.get(), data, size);
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ io::poll(pipe.get(), io::READ).onAny(
+ defer(self(), &Self::stream, lambda::_1, persist));
+ break;
+ } else {
+ std::ostringstream out;
+ if (length <= 0) {
+ // Error or closed, treat both as closed.
+ if (length < 0) {
+ // Error.
+ const char* error = strerror(errno);
+ VLOG(1) << "Read error while streaming: " << error;
+ }
+ out << "0\r\n" << "\r\n";
+ finished = true;
+ } else {
+ // Data!
+ out << std::hex << length << "\r\n";
+ out.write(data, length);
+ out << "\r\n";
+ }
+ socket_manager->send(new DataEncoder(out.str()), socket, persist);
+ }
+ }
+ } else if (poll.isFailed()) {
+ VLOG(1) << "Failed to poll: " << poll.failure();
+ socket_manager->send(InternalServerError(), socket, persist);
+ finished = true;
+ } else {
+ VLOG(1) << "Unexpected discarded future while polling";
+ socket_manager->send(InternalServerError(), socket, persist);
+ finished = true;
+ }
+
+ if (finished) {
+ close(pipe.get());
+ pipe = Option<int>::none();
+ next();
+ }
}
Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1397753&r1=1397752&r2=1397753&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Sat Oct 13 00:55:08 2012
@@ -32,6 +32,7 @@ using namespace process;
using testing::_;
using testing::Assign;
+using testing::DoAll;
using testing::Return;
using testing::ReturnArg;
@@ -1092,10 +1093,12 @@ class HttpProcess : public Process<HttpP
public:
HttpProcess()
{
- route("/handler", &HttpProcess::handler);
+ route("/body", &HttpProcess::body);
+ route("/pipe", &HttpProcess::pipe);
}
- MOCK_METHOD1(handler, Future<http::Response>(const http::Request&));
+ MOCK_METHOD1(body, Future<http::Response>(const http::Request&));
+ MOCK_METHOD1(pipe, Future<http::Response>(const http::Request&));
};
@@ -1105,11 +1108,9 @@ TEST(Process, http)
HttpProcess process;
- EXPECT_CALL(process, handler(_))
- .WillOnce(Return(http::OK()));
-
spawn(process);
+ // First hit '/body' (using explicit sockets and HTTP/1.0).
int s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
ASSERT_LE(0, s);
@@ -1123,14 +1124,16 @@ TEST(Process, http)
ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
std::ostringstream out;
-
- out << "GET /" << process.self().id << "/" << "handler"
+ out << "GET /" << process.self().id << "/body"
<< " HTTP/1.0\r\n"
<< "Connection: Keep-Alive\r\n"
<< "\r\n";
const std::string& data = out.str();
+ EXPECT_CALL(process, body(_))
+ .WillOnce(Return(http::OK()));
+
ASSERT_EQ(data.size(), write(s, data.data(), data.size()));
std::string response = "HTTP/1.1 200 OK";
@@ -1143,6 +1146,33 @@ TEST(Process, http)
ASSERT_EQ(0, close(s));
+ // Now hit '/pipe' (by using http::get).
+ int pipes[2];
+ ASSERT_NE(-1, ::pipe(pipes));
+
+ http::OK ok;
+ ok.type = http::Response::PIPE;
+ ok.pipe = pipes[0];
+
+ volatile bool pipeCalled = false;
+
+ EXPECT_CALL(process, pipe(_))
+ .WillOnce(DoAll(Assign(&pipeCalled, true),
+ Return(ok)));
+
+ Future<http::Response> future = http::get(process.self(), "pipe");
+
+ while (!pipeCalled);
+
+ ASSERT_TRUE(os::write(pipes[1], "Hello World\n").isSome());
+ ASSERT_TRUE(os::close(pipes[1]).isSome());
+
+ future.await(Seconds(1.0));
+ ASSERT_TRUE(future.isReady());
+ ASSERT_EQ(http::statuses[200], future.get().status);
+ ASSERT_EQ("chunked", future.get().headers["Transfer-Encoding"]);
+ ASSERT_EQ("Hello World\n", future.get().body);
+
terminate(process);
wait(process);
}