You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/06 03:38:03 UTC

svn commit: r1453139 - in /incubator/mesos/trunk/third_party/libprocess: include/process/http.hpp src/encoder.hpp src/process.cpp

Author: vinodkone
Date: Wed Mar  6 02:38:02 2013
New Revision: 1453139

URL: http://svn.apache.org/r1453139
Log:
Used gzip encoding only when acceptable.

From: Ben Mahler <be...@gmail.com>
Review: https://reviews.apache.org/r/8797

Modified:
    incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp
    incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp?rev=1453139&r1=1453138&r2=1453139&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/http.hpp Wed Mar  6 02:38:02 2013
@@ -35,6 +35,71 @@ struct Request
   hashmap<std::string, std::string> query;
   std::string body;
   bool keepAlive;
+
+  // Returns whether the encoding is considered acceptable in the request.
+  // TODO(bmahler): Consider this logic being in decoder.hpp, and having the
+  // Request contain a member variable for each popular HTTP 1.0/1.1 header.
+  bool accepts(const std::string& encoding) const
+  {
+    // See RFC 2616, section 14.3 for the details.
+    Option<std::string> accepted = headers.get("Accept-Encoding");
+
+    if (accepted.isNone()) {
+      return false;
+    }
+
+    // Remove spaces and tabs for easier parsing.
+    accepted = strings::remove(accepted.get(), " ");
+    accepted = strings::remove(accepted.get(), "\t");
+    accepted = strings::remove(accepted.get(), "\n");
+
+    // From RFC 2616:
+    // 1. If the content-coding is one of the content-codings listed in
+    //    the Accept-Encoding field, then it is acceptable, unless it is
+    //    accompanied by a qvalue of 0. (As defined in section 3.9, a
+    //    qvalue of 0 means "not acceptable.")
+    // 2. The special "*" symbol in an Accept-Encoding field matches any
+    //    available content-coding not explicitly listed in the header
+    //    field.
+
+    // First we'll look for the encoding specified explicitly, then '*'.
+    std::vector<std::string> candidates;
+    candidates.push_back(encoding);      // Rule 1.
+    candidates.push_back("*");           // Rule 2.
+
+    foreach (std::string& candidate, candidates) {
+      // Is the candidate one of the accepted encodings?
+      foreach (const std::string& _encoding,
+               strings::tokenize(accepted.get(), ",")) {
+        if (strings::startsWith(_encoding, candidate)) {
+          // Is there a 0 q value? Ex: 'gzip;q=0.0'.
+          const std::map<std::string, std::vector<std::string> >& values =
+            strings::pairs(_encoding, ";", "=");
+
+          // Look for { "q": ["0"] }.
+          if (values.count("q") == 0 || values.find("q")->second.size() != 1) {
+            // No q value, or malformed q value.
+            return true;
+          }
+
+          // Is the q value > 0?
+          Try<double> value = numify<double>(values.find("q")->second[0]);
+          return value.isSome() && value.get() > 0;
+        }
+      }
+    }
+
+    // NOTE: 3 and 4 are partially ignored since we can only provide gzip.
+    // 3. If multiple content-codings are acceptable, then the acceptable
+    //    content-coding with the highest non-zero qvalue is preferred.
+    // 4. The "identity" content-coding is always acceptable, unless
+    //    specifically refused because the Accept-Encoding field includes
+    //    "identity;q=0", or because the field includes "*;q=0" and does
+    //    not explicitly include the "identity" content-coding. If the
+    //    Accept-Encoding field-value is empty, then only the "identity"
+    //    encoding is acceptable.
+    return false;
+  }
 };
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp?rev=1453139&r1=1453138&r2=1453139&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp Wed Mar  6 02:38:02 2013
@@ -3,6 +3,7 @@
 
 #include <ev.h>
 
+#include <map>
 #include <sstream>
 
 #include <process/http.hpp>
@@ -16,6 +17,8 @@
 
 namespace process {
 
+const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
+
 typedef void (*Sender)(struct ev_loop*, ev_io*, int);
 
 extern void send_data(struct ev_loop*, ev_io*, int);
@@ -115,10 +118,14 @@ private:
 class HttpResponseEncoder : public DataEncoder
 {
 public:
-  HttpResponseEncoder(const http::Response& response)
-    : DataEncoder(encode(response)) {}
-
-  static std::string encode(const http::Response& response)
+  HttpResponseEncoder(
+      const http::Response& response,
+      const http::Request& request)
+    : DataEncoder(encode(response, request)) {}
+
+  static std::string encode(
+      const http::Response& response,
+      const http::Request& request)
   {
     std::ostringstream out;
 
@@ -141,12 +148,13 @@ public:
 
     headers["Date"] = date;
 
-    // Swap the body with a gzip compressed version, if no encoding has
-    // been specified.
+    // Should we compress this response?
     std::string body = response.body;
 
     if (response.type == http::Response::BODY &&
-        !headers.contains("Content-Encoding")) {
+        response.body.length() >= GZIP_MINIMUM_BODY_LENGTH &&
+        !headers.contains("Content-Encoding") &&
+        request.accepts("gzip")) {
       Try<std::string> compressed = gzip::compress(body);
       if (compressed.isError()) {
         LOG(WARNING) << "Failed to gzip response body: " << compressed.error();

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1453139&r1=1453138&r2=1453139&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Wed Mar  6 02:38:02 2013
@@ -255,12 +255,12 @@ public:
 
   // Enqueues the response to be sent once all previously enqueued
   // responses have been processed (e.g., waited for and sent).
-  void enqueue(const Response& response, bool persist);
+  void enqueue(const Response& response, const Request& request);
 
   // Enqueues a future to a response that will get waited on (up to
   // some timeout) and then sent once all previously enqueued
   // responses have been processed (e.g., waited for and sent).
-  void handle(Future<Response>* future, bool persist);
+  void handle(Future<Response>* future, const Request& request);
 
 private:
   // Starts "waiting" on the next available future response.
@@ -270,27 +270,29 @@ private:
   void waited(const Future<Response>& future);
 
   // Demuxes and handles a response.
-  bool process(const Future<Response>& future, bool persist);
+  bool process(const Future<Response>& future, const Request& request);
 
   // Handles stream (i.e., pipe) based responses.
-  void stream(const Future<short>& poll, bool persist);
+  void stream(const Future<short>& poll, const Request& request);
 
   Socket socket; // Wrap the socket to keep it from getting closed.
 
   // Describes a queue "item" that wraps the future to the response
-  // and whether or not the socket should be persisted (vs closed).
+  // and the original request.
+  // The original request contains needed information such as what encodings
+  // are acceptable and whether to persist the connection.
   struct Item
   {
-    Item(Future<Response>* _future, bool _persist)
-      : future(_future), persist(_persist) {}
+    Item(const Request& _request, Future<Response>* _future)
+      : request(_request), future(_future) {}
 
     ~Item()
     {
       delete future;
     }
 
+    const Request request; // Make a copy.
     Future<Response>* future;
-    bool persist;
   };
 
   queue<Item*> items;
@@ -312,7 +314,9 @@ public:
   PID<HttpProxy> proxy(int s);
 
   void send(Encoder* encoder, int s, bool persist);
-  void send(const Response& response, int s, bool persist);
+  void send(const Response& response,
+            const Request& request,
+            int s);
   void send(Message* message);
 
   Encoder* next(int s);
@@ -1465,15 +1469,15 @@ HttpProxy::~HttpProxy()
 }
 
 
-void HttpProxy::enqueue(const Response& response, bool persist)
+void HttpProxy::enqueue(const Response& response, const Request& request)
 {
-  handle(new Future<Response>(response), persist);
+  handle(new Future<Response>(response), request);
 }
 
 
-void HttpProxy::handle(Future<Response>* future, bool persist)
+void HttpProxy::handle(Future<Response>* future, const Request& request)
 {
-  items.push(new Item(future, persist));
+  items.push(new Item(request, future));
 
   if (items.size() == 1) {
     next();
@@ -1500,7 +1504,7 @@ void HttpProxy::waited(const Future<Resp
 
   // Process the item and determine if we're done or not (so we know
   // whether to start waiting on the next responses).
-  bool processed = process(*item->future, item->persist);
+  bool processed = process(*item->future, item->request);
 
   items.pop();
   delete item;
@@ -1511,25 +1515,17 @@ void HttpProxy::waited(const Future<Resp
 }
 
 
-bool HttpProxy::process(const Future<Response>& future, bool persist)
+bool HttpProxy::process(const Future<Response>& future, const Request& request)
 {
   if (!future.isReady()) {
     // TODO(benh): Consider handling other "states" of future
     // (discarded, failed, etc) with different HTTP statuses.
-    socket_manager->send(ServiceUnavailable(), socket, persist);
+    socket_manager->send(ServiceUnavailable(), request, socket);
     return true; // All done, can process next response.
   }
 
   Response response = future.get();
 
-  // Don't persist connection if headers include 'Connection: close'.
-  if (response.headers.count("Connection") > 0) {
-    const string& connection = response.headers.find("Connection")->second;
-    if (connection == "close") {
-      persist = false;
-    }
-  }
-
   // If the response specifies a path, try and perform a sendfile.
   if (response.type == Response::PATH) {
     // Make sure no body is sent (this is really an error and
@@ -1541,21 +1537,21 @@ bool HttpProxy::process(const Future<Res
     if (fd < 0) {
       if (errno == ENOENT || errno == ENOTDIR) {
           VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
-          socket_manager->send(NotFound(), socket, persist);
+          socket_manager->send(NotFound(), request, socket);
       } else {
         const char* error = strerror(errno);
         VLOG(1) << "Failed to send file at '" << path << "': " << error;
-        socket_manager->send(InternalServerError(), socket, persist);
+        socket_manager->send(InternalServerError(), request, socket);
       }
     } else {
       struct stat s; // Need 'struct' because of function named 'stat'.
       if (fstat(fd, &s) != 0) {
         const char* error = strerror(errno);
         VLOG(1) << "Failed to send file at '" << path << "': " << error;
-        socket_manager->send(InternalServerError(), socket, persist);
+        socket_manager->send(InternalServerError(), request, socket);
       } else if (S_ISDIR(s.st_mode)) {
         VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
-        socket_manager->send(NotFound(), socket, persist);
+        socket_manager->send(NotFound(), request, socket);
       } else {
         // While the user is expected to properly set a 'Content-Type'
         // header, we fill in (or overwrite) 'Content-Length' header.
@@ -1564,7 +1560,7 @@ bool HttpProxy::process(const Future<Res
         response.headers["Content-Length"] = out.str();
 
         if (s.st_size == 0) {
-          socket_manager->send(response, socket, persist);
+          socket_manager->send(response, request, socket);
           return true; // All done, can process next request.
         }
 
@@ -1572,11 +1568,12 @@ bool HttpProxy::process(const Future<Res
 
         // TODO(benh): Consider a way to have the socket manager turn
         // on TCP_CORK for both sends and then turn it off.
-        socket_manager->send(response, socket, true);
+        Encoder* encoder = new HttpResponseEncoder(response, request);
+        socket_manager->send(encoder, socket, true);
 
         // Note the file descriptor gets closed by FileEncoder.
-        Encoder* encoder = new FileEncoder(fd, s.st_size);
-        socket_manager->send(encoder, socket, persist);
+        encoder = new FileEncoder(fd, s.st_size);
+        socket_manager->send(encoder, socket, request.keepAlive);
       }
     }
   } else if (response.type == Response::PIPE) {
@@ -1589,7 +1586,7 @@ bool HttpProxy::process(const Future<Res
     if (nonblock.isError()) {
       const char* error = strerror(errno);
       VLOG(1) << "Failed make pipe nonblocking: " << error;
-      socket_manager->send(InternalServerError(), socket, persist);
+      socket_manager->send(InternalServerError(), request, socket);
       return true; // All done, can process next response.
     }
 
@@ -1599,29 +1596,30 @@ bool HttpProxy::process(const Future<Res
 
     VLOG(1) << "Starting \"chunked\" streaming";
 
-    socket_manager->send(response, socket, true);
+    Encoder* encoder = new HttpResponseEncoder(response, request);
+    socket_manager->send(encoder, socket, true);
 
     pipe = response.pipe;
 
     io::poll(pipe.get(), io::READ).onAny(
-        defer(self(), &Self::stream, lambda::_1, persist));
+        defer(self(), &Self::stream, lambda::_1, request));
 
     return false; // Streaming, don't process next response (yet)!
   } else {
-    socket_manager->send(response, socket, persist);
+    socket_manager->send(response, request, socket);
   }
 
   return true; // All done, can process next response.
 }
 
 
-void HttpProxy::stream(const Future<short>& poll, bool persist)
+void HttpProxy::stream(const Future<short>& poll, const Request& request)
 {
   // TODO(benh): Use 'splice' on Linux.
 
   CHECK(pipe.isSome());
 
-  bool finished = false; // Whether or not we're done streaming.
+  bool finished = false; // Whether we're done streaming.
 
   if (poll.isReady()) {
     // Read and write.
@@ -1636,7 +1634,7 @@ void HttpProxy::stream(const Future<shor
       } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
         // Might block, try again later.
         io::poll(pipe.get(), io::READ).onAny(
-            defer(self(), &Self::stream, lambda::_1, persist));
+            defer(self(), &Self::stream, lambda::_1, request));
         break;
       } else {
         std::ostringstream out;
@@ -1655,16 +1653,22 @@ void HttpProxy::stream(const Future<shor
           out.write(data, length);
           out << "\r\n";
         }
-        socket_manager->send(new DataEncoder(out.str()), socket, persist);
+
+        // We always persist the connection when we're not finished
+        // streaming.
+        socket_manager->send(
+          new DataEncoder(out.str()),
+          socket,
+          finished ? request.keepAlive : true);
       }
     }
   } else if (poll.isFailed()) {
     VLOG(1) << "Failed to poll: " << poll.failure();
-    socket_manager->send(InternalServerError(), socket, persist);
+    socket_manager->send(InternalServerError(), request, socket);
     finished = true;
   } else {
     VLOG(1) << "Unexpected discarded future while polling";
-    socket_manager->send(InternalServerError(), socket, persist);
+    socket_manager->send(InternalServerError(), request, socket);
     finished = true;
   }
 
@@ -1845,9 +1849,22 @@ void SocketManager::send(Encoder* encode
 }
 
 
-void SocketManager::send(const Response& response, int s, bool persist)
-{
-  send(new HttpResponseEncoder(response), s, persist);
+void SocketManager::send(
+    const Response& response,
+    const Request& request,
+    int s)
+{
+  bool persist = request.keepAlive;
+
+  // Don't persist the connection if the headers include
+  // 'Connection: close'.
+  if (response.headers.contains("Connection")) {
+    if (response.headers.get("Connection").get() == "close") {
+      persist = false;
+    }
+  }
+
+  send(new HttpResponseEncoder(response, request), s, persist);
 }
 
 
@@ -2195,7 +2212,7 @@ bool ProcessManager::handle(
 
     // Enqueue the response with the HttpProxy so that it respects the
     // order of requests to account for HTTP/1.1 pipelining.
-    dispatch(proxy, &HttpProxy::enqueue, BadRequest(), request->keepAlive);
+    dispatch(proxy, &HttpProxy::enqueue, BadRequest(), *request);
 
     // Cleanup request.
     delete request;
@@ -2212,7 +2229,7 @@ bool ProcessManager::handle(
 
     // Enqueue the response with the HttpProxy so that it respects the
     // order of requests to account for HTTP/1.1 pipelining.
-    dispatch(proxy, &HttpProxy::enqueue, NotFound(), request->keepAlive);
+    dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
 
     // Cleanup request.
     delete request;
@@ -2252,7 +2269,7 @@ bool ProcessManager::handle(
 
   // Enqueue the response with the HttpProxy so that it respects the
   // order of requests to account for HTTP/1.1 pipelining.
-  dispatch(proxy, &HttpProxy::enqueue, NotFound(), request->keepAlive);
+  dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
 
   // Cleanup request.
   delete request;
@@ -2917,7 +2934,7 @@ void ProcessBase::visit(const HttpEvent&
     PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
 
     // Let the HttpProxy know about this request (via the future).
-    dispatch(proxy, &HttpProxy::handle, future, event.request->keepAlive);
+    dispatch(proxy, &HttpProxy::handle, future, *event.request);
 
     // Now call the handler and associate the response with the promise.
     promise->associate(handlers.http[name](*event.request));
@@ -2952,7 +2969,7 @@ void ProcessBase::visit(const HttpEvent&
 
     // Enqueue the response with the HttpProxy so that it respects the
     // order of requests to account for HTTP/1.1 pipelining.
-    dispatch(proxy, &HttpProxy::enqueue, response, event.request->keepAlive);
+    dispatch(proxy, &HttpProxy::enqueue, response, *event.request);
   } else {
     VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";
 
@@ -2961,8 +2978,7 @@ void ProcessBase::visit(const HttpEvent&
 
     // Enqueue the response with the HttpProxy so that it respects the
     // order of requests to account for HTTP/1.1 pipelining.
-    dispatch(proxy, &HttpProxy::enqueue,
-             NotFound(), event.request->keepAlive);
+    dispatch(proxy, &HttpProxy::enqueue, NotFound(), *event.request);
   }
 }