You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/06 03:38:03 UTC
svn commit: r1453139 - in /incubator/mesos/trunk/third_party/libprocess:
include/process/http.hpp src/encoder.hpp src/process.cpp
Author: vinodkone
Date: Wed Mar 6 02:38:02 2013
New Revision: 1453139
URL: http://svn.apache.org/r1453139
Log:
Used gzip encoding only when acceptable.
From: Ben Mahler <be...@gmail.com>
Review: https://reviews.apache.org/r/8797
Modified:
incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp
incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp?rev=1453139&r1=1453138&r2=1453139&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp Wed Mar 6 02:38:02 2013
@@ -35,6 +35,71 @@ struct Request
hashmap<std::string, std::string> query;
std::string body;
bool keepAlive;
+
+ // Returns whether the encoding is considered acceptable in the request.
+ // TODO(bmahler): Consider this logic being in decoder.hpp, and having the
+ // Request contain a member variable for each popular HTTP 1.0/1.1 header.
+ bool accepts(const std::string& encoding) const
+ {
+ // See RFC 2616, section 14.3 for the details.
+ Option<std::string> accepted = headers.get("Accept-Encoding");
+
+ if (accepted.isNone()) {
+ return false;
+ }
+
+ // Remove spaces and tabs for easier parsing.
+ accepted = strings::remove(accepted.get(), " ");
+ accepted = strings::remove(accepted.get(), "\t");
+ accepted = strings::remove(accepted.get(), "\n");
+
+ // From RFC 2616:
+ // 1. If the content-coding is one of the content-codings listed in
+ // the Accept-Encoding field, then it is acceptable, unless it is
+ // accompanied by a qvalue of 0. (As defined in section 3.9, a
+ // qvalue of 0 means "not acceptable.")
+ // 2. The special "*" symbol in an Accept-Encoding field matches any
+ // available content-coding not explicitly listed in the header
+ // field.
+
+ // First we'll look for the encoding specified explicitly, then '*'.
+ std::vector<std::string> candidates;
+ candidates.push_back(encoding); // Rule 1.
+ candidates.push_back("*"); // Rule 2.
+
+ foreach (std::string& candidate, candidates) {
+ // Is the candidate one of the accepted encodings?
+ foreach (const std::string& _encoding,
+ strings::tokenize(accepted.get(), ",")) {
+ if (strings::startsWith(_encoding, candidate)) {
+ // Is there a 0 q value? Ex: 'gzip;q=0.0'.
+ const std::map<std::string, std::vector<std::string> >& values =
+ strings::pairs(_encoding, ";", "=");
+
+ // Look for { "q": ["0"] }.
+ if (values.count("q") == 0 || values.find("q")->second.size() != 1) {
+ // No q value, or malformed q value.
+ return true;
+ }
+
+ // Is the q value > 0?
+ Try<double> value = numify<double>(values.find("q")->second[0]);
+ return value.isSome() && value.get() > 0;
+ }
+ }
+ }
+
+ // NOTE: 3 and 4 are partially ignored since we can only provide gzip.
+ // 3. If multiple content-codings are acceptable, then the acceptable
+ // content-coding with the highest non-zero qvalue is preferred.
+ // 4. The "identity" content-coding is always acceptable, unless
+ // specifically refused because the Accept-Encoding field includes
+ // "identity;q=0", or because the field includes "*;q=0" and does
+ // not explicitly include the "identity" content-coding. If the
+ // Accept-Encoding field-value is empty, then only the "identity"
+ // encoding is acceptable.
+ return false;
+ }
};
Modified: incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp?rev=1453139&r1=1453138&r2=1453139&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp Wed Mar 6 02:38:02 2013
@@ -3,6 +3,7 @@
#include <ev.h>
+#include <map>
#include <sstream>
#include <process/http.hpp>
@@ -16,6 +17,8 @@
namespace process {
+const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
+
typedef void (*Sender)(struct ev_loop*, ev_io*, int);
extern void send_data(struct ev_loop*, ev_io*, int);
@@ -115,10 +118,14 @@ private:
class HttpResponseEncoder : public DataEncoder
{
public:
- HttpResponseEncoder(const http::Response& response)
- : DataEncoder(encode(response)) {}
-
- static std::string encode(const http::Response& response)
+ HttpResponseEncoder(
+ const http::Response& response,
+ const http::Request& request)
+ : DataEncoder(encode(response, request)) {}
+
+ static std::string encode(
+ const http::Response& response,
+ const http::Request& request)
{
std::ostringstream out;
@@ -141,12 +148,13 @@ public:
headers["Date"] = date;
- // Swap the body with a gzip compressed version, if no encoding has
- // been specified.
+ // Should we compress this response?
std::string body = response.body;
if (response.type == http::Response::BODY &&
- !headers.contains("Content-Encoding")) {
+ response.body.length() >= GZIP_MINIMUM_BODY_LENGTH &&
+ !headers.contains("Content-Encoding") &&
+ request.accepts("gzip")) {
Try<std::string> compressed = gzip::compress(body);
if (compressed.isError()) {
LOG(WARNING) << "Failed to gzip response body: " << compressed.error();
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1453139&r1=1453138&r2=1453139&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Wed Mar 6 02:38:02 2013
@@ -255,12 +255,12 @@ public:
// Enqueues the response to be sent once all previously enqueued
// responses have been processed (e.g., waited for and sent).
- void enqueue(const Response& response, bool persist);
+ void enqueue(const Response& response, const Request& request);
// Enqueues a future to a response that will get waited on (up to
// some timeout) and then sent once all previously enqueued
// responses have been processed (e.g., waited for and sent).
- void handle(Future<Response>* future, bool persist);
+ void handle(Future<Response>* future, const Request& request);
private:
// Starts "waiting" on the next available future response.
@@ -270,27 +270,29 @@ private:
void waited(const Future<Response>& future);
// Demuxes and handles a response.
- bool process(const Future<Response>& future, bool persist);
+ bool process(const Future<Response>& future, const Request& request);
// Handles stream (i.e., pipe) based responses.
- void stream(const Future<short>& poll, bool persist);
+ void stream(const Future<short>& poll, const Request& request);
Socket socket; // Wrap the socket to keep it from getting closed.
// Describes a queue "item" that wraps the future to the response
- // and whether or not the socket should be persisted (vs closed).
+ // and the original request.
+ // The original request contains needed information such as what encodings
+ // are acceptable and whether to persist the connection.
struct Item
{
- Item(Future<Response>* _future, bool _persist)
- : future(_future), persist(_persist) {}
+ Item(const Request& _request, Future<Response>* _future)
+ : request(_request), future(_future) {}
~Item()
{
delete future;
}
+ const Request request; // Make a copy.
Future<Response>* future;
- bool persist;
};
queue<Item*> items;
@@ -312,7 +314,9 @@ public:
PID<HttpProxy> proxy(int s);
void send(Encoder* encoder, int s, bool persist);
- void send(const Response& response, int s, bool persist);
+ void send(const Response& response,
+ const Request& request,
+ int s);
void send(Message* message);
Encoder* next(int s);
@@ -1465,15 +1469,15 @@ HttpProxy::~HttpProxy()
}
-void HttpProxy::enqueue(const Response& response, bool persist)
+void HttpProxy::enqueue(const Response& response, const Request& request)
{
- handle(new Future<Response>(response), persist);
+ handle(new Future<Response>(response), request);
}
-void HttpProxy::handle(Future<Response>* future, bool persist)
+void HttpProxy::handle(Future<Response>* future, const Request& request)
{
- items.push(new Item(future, persist));
+ items.push(new Item(request, future));
if (items.size() == 1) {
next();
@@ -1500,7 +1504,7 @@ void HttpProxy::waited(const Future<Resp
// Process the item and determine if we're done or not (so we know
// whether to start waiting on the next responses).
- bool processed = process(*item->future, item->persist);
+ bool processed = process(*item->future, item->request);
items.pop();
delete item;
@@ -1511,25 +1515,17 @@ void HttpProxy::waited(const Future<Resp
}
-bool HttpProxy::process(const Future<Response>& future, bool persist)
+bool HttpProxy::process(const Future<Response>& future, const Request& request)
{
if (!future.isReady()) {
// TODO(benh): Consider handling other "states" of future
// (discarded, failed, etc) with different HTTP statuses.
- socket_manager->send(ServiceUnavailable(), socket, persist);
+ socket_manager->send(ServiceUnavailable(), request, socket);
return true; // All done, can process next response.
}
Response response = future.get();
- // Don't persist connection if headers include 'Connection: close'.
- if (response.headers.count("Connection") > 0) {
- const string& connection = response.headers.find("Connection")->second;
- if (connection == "close") {
- persist = false;
- }
- }
-
// If the response specifies a path, try and perform a sendfile.
if (response.type == Response::PATH) {
// Make sure no body is sent (this is really an error and
@@ -1541,21 +1537,21 @@ bool HttpProxy::process(const Future<Res
if (fd < 0) {
if (errno == ENOENT || errno == ENOTDIR) {
VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
- socket_manager->send(NotFound(), socket, persist);
+ socket_manager->send(NotFound(), request, socket);
} else {
const char* error = strerror(errno);
VLOG(1) << "Failed to send file at '" << path << "': " << error;
- socket_manager->send(InternalServerError(), socket, persist);
+ socket_manager->send(InternalServerError(), request, socket);
}
} else {
struct stat s; // Need 'struct' because of function named 'stat'.
if (fstat(fd, &s) != 0) {
const char* error = strerror(errno);
VLOG(1) << "Failed to send file at '" << path << "': " << error;
- socket_manager->send(InternalServerError(), socket, persist);
+ socket_manager->send(InternalServerError(), request, socket);
} else if (S_ISDIR(s.st_mode)) {
VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
- socket_manager->send(NotFound(), socket, persist);
+ socket_manager->send(NotFound(), request, socket);
} else {
// While the user is expected to properly set a 'Content-Type'
// header, we fill in (or overwrite) 'Content-Length' header.
@@ -1564,7 +1560,7 @@ bool HttpProxy::process(const Future<Res
response.headers["Content-Length"] = out.str();
if (s.st_size == 0) {
- socket_manager->send(response, socket, persist);
+ socket_manager->send(response, request, socket);
return true; // All done, can process next request.
}
@@ -1572,11 +1568,12 @@ bool HttpProxy::process(const Future<Res
// TODO(benh): Consider a way to have the socket manager turn
// on TCP_CORK for both sends and then turn it off.
- socket_manager->send(response, socket, true);
+ Encoder* encoder = new HttpResponseEncoder(response, request);
+ socket_manager->send(encoder, socket, true);
// Note the file descriptor gets closed by FileEncoder.
- Encoder* encoder = new FileEncoder(fd, s.st_size);
- socket_manager->send(encoder, socket, persist);
+ encoder = new FileEncoder(fd, s.st_size);
+ socket_manager->send(encoder, socket, request.keepAlive);
}
}
} else if (response.type == Response::PIPE) {
@@ -1589,7 +1586,7 @@ bool HttpProxy::process(const Future<Res
if (nonblock.isError()) {
const char* error = strerror(errno);
VLOG(1) << "Failed make pipe nonblocking: " << error;
- socket_manager->send(InternalServerError(), socket, persist);
+ socket_manager->send(InternalServerError(), request, socket);
return true; // All done, can process next response.
}
@@ -1599,29 +1596,30 @@ bool HttpProxy::process(const Future<Res
VLOG(1) << "Starting \"chunked\" streaming";
- socket_manager->send(response, socket, true);
+ Encoder* encoder = new HttpResponseEncoder(response, request);
+ socket_manager->send(encoder, socket, true);
pipe = response.pipe;
io::poll(pipe.get(), io::READ).onAny(
- defer(self(), &Self::stream, lambda::_1, persist));
+ defer(self(), &Self::stream, lambda::_1, request));
return false; // Streaming, don't process next response (yet)!
} else {
- socket_manager->send(response, socket, persist);
+ socket_manager->send(response, request, socket);
}
return true; // All done, can process next response.
}
-void HttpProxy::stream(const Future<short>& poll, bool persist)
+void HttpProxy::stream(const Future<short>& poll, const Request& request)
{
// TODO(benh): Use 'splice' on Linux.
CHECK(pipe.isSome());
- bool finished = false; // Whether or not we're done streaming.
+ bool finished = false; // Whether we're done streaming.
if (poll.isReady()) {
// Read and write.
@@ -1636,7 +1634,7 @@ void HttpProxy::stream(const Future<shor
} else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Might block, try again later.
io::poll(pipe.get(), io::READ).onAny(
- defer(self(), &Self::stream, lambda::_1, persist));
+ defer(self(), &Self::stream, lambda::_1, request));
break;
} else {
std::ostringstream out;
@@ -1655,16 +1653,22 @@ void HttpProxy::stream(const Future<shor
out.write(data, length);
out << "\r\n";
}
- socket_manager->send(new DataEncoder(out.str()), socket, persist);
+
+ // We always persist the connection when we're not finished
+ // streaming.
+ socket_manager->send(
+ new DataEncoder(out.str()),
+ socket,
+ finished ? request.keepAlive : true);
}
}
} else if (poll.isFailed()) {
VLOG(1) << "Failed to poll: " << poll.failure();
- socket_manager->send(InternalServerError(), socket, persist);
+ socket_manager->send(InternalServerError(), request, socket);
finished = true;
} else {
VLOG(1) << "Unexpected discarded future while polling";
- socket_manager->send(InternalServerError(), socket, persist);
+ socket_manager->send(InternalServerError(), request, socket);
finished = true;
}
@@ -1845,9 +1849,22 @@ void SocketManager::send(Encoder* encode
}
-void SocketManager::send(const Response& response, int s, bool persist)
-{
- send(new HttpResponseEncoder(response), s, persist);
+void SocketManager::send(
+ const Response& response,
+ const Request& request,
+ int s)
+{
+ bool persist = request.keepAlive;
+
+ // Don't persist the connection if the headers include
+ // 'Connection: close'.
+ if (response.headers.contains("Connection")) {
+ if (response.headers.get("Connection").get() == "close") {
+ persist = false;
+ }
+ }
+
+ send(new HttpResponseEncoder(response, request), s, persist);
}
@@ -2195,7 +2212,7 @@ bool ProcessManager::handle(
// Enqueue the response with the HttpProxy so that it respects the
// order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, BadRequest(), request->keepAlive);
+ dispatch(proxy, &HttpProxy::enqueue, BadRequest(), *request);
// Cleanup request.
delete request;
@@ -2212,7 +2229,7 @@ bool ProcessManager::handle(
// Enqueue the response with the HttpProxy so that it respects the
// order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, NotFound(), request->keepAlive);
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
// Cleanup request.
delete request;
@@ -2252,7 +2269,7 @@ bool ProcessManager::handle(
// Enqueue the response with the HttpProxy so that it respects the
// order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, NotFound(), request->keepAlive);
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
// Cleanup request.
delete request;
@@ -2917,7 +2934,7 @@ void ProcessBase::visit(const HttpEvent&
PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
// Let the HttpProxy know about this request (via the future).
- dispatch(proxy, &HttpProxy::handle, future, event.request->keepAlive);
+ dispatch(proxy, &HttpProxy::handle, future, *event.request);
// Now call the handler and associate the response with the promise.
promise->associate(handlers.http[name](*event.request));
@@ -2952,7 +2969,7 @@ void ProcessBase::visit(const HttpEvent&
// Enqueue the response with the HttpProxy so that it respects the
// order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue, response, event.request->keepAlive);
+ dispatch(proxy, &HttpProxy::enqueue, response, *event.request);
} else {
VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";
@@ -2961,8 +2978,7 @@ void ProcessBase::visit(const HttpEvent&
// Enqueue the response with the HttpProxy so that it respects the
// order of requests to account for HTTP/1.1 pipelining.
- dispatch(proxy, &HttpProxy::enqueue,
- NotFound(), event.request->keepAlive);
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *event.request);
}
}