You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/31 02:12:44 UTC

[2/3] mesos git commit: Added a StreamingResponseDecoder.

Added a StreamingResponseDecoder.

Review: https://reviews.apache.org/r/32347


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6ac8eb1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6ac8eb1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6ac8eb1c

Branch: refs/heads/master
Commit: 6ac8eb1c524dc4adbc09d20dcb8e4e31d60eeb56
Parents: 8b0bba1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 11:44:51 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:38:22 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/decoder.hpp             | 232 +++++++++++++++++++
 3rdparty/libprocess/src/tests/decoder_tests.cpp |  91 ++++++++
 2 files changed, 323 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index b3a667c..56adde0 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -14,6 +14,7 @@
 
 #include <stout/foreach.hpp>
 #include <stout/gzip.hpp>
+#include <stout/option.hpp>
 #include <stout/try.hpp>
 
 
@@ -469,6 +470,237 @@ private:
 };
 
 
+// Provides a response decoder that returns 'PIPE' responses once
+// the response headers are received, but before the body data
+// is received. Callers are expected to read the body from the
+// Pipe::Reader in the response.
+//
+// TODO(bmahler): Consolidate streaming and non-streaming response
+// decoders.
+class StreamingResponseDecoder
+{
+public:
+  StreamingResponseDecoder()
+    : failure(false), header(HEADER_FIELD), response(NULL)
+  {
+    settings.on_message_begin =
+      &StreamingResponseDecoder::on_message_begin;
+
+#if !(HTTP_PARSER_VERSION_MAJOR >=2)
+    settings.on_path =
+      &StreamingResponseDecoder::on_path;
+    settings.on_fragment =
+      &StreamingResponseDecoder::on_fragment;
+    settings.on_query_string =
+      &StreamingResponseDecoder::on_query_string;
+#endif
+
+    settings.on_url =
+      &StreamingResponseDecoder::on_url;
+    settings.on_header_field =
+      &StreamingResponseDecoder::on_header_field;
+    settings.on_header_value =
+      &StreamingResponseDecoder::on_header_value;
+    settings.on_headers_complete =
+      &StreamingResponseDecoder::on_headers_complete;
+    settings.on_body =
+      &StreamingResponseDecoder::on_body;
+    settings.on_message_complete =
+      &StreamingResponseDecoder::on_message_complete;
+
+    http_parser_init(&parser, HTTP_RESPONSE);
+
+    parser.data = this;
+  }
+
+  std::deque<http::Response*> decode(const char* data, size_t length)
+  {
+    size_t parsed = http_parser_execute(&parser, &settings, data, length);
+
+    if (parsed != length) {
+      // TODO(bmahler): joyent/http-parser exposes error reasons.
+      failure = true;
+
+      // If we're still writing the body, fail the writer!
+      if (writer.isSome()) {
+        http::Pipe::Writer writer_ = writer.get(); // Remove const.
+        writer_.fail("failed to decode body");
+        writer = None();
+      }
+    }
+
+    if (!responses.empty()) {
+      std::deque<http::Response*> result = responses;
+      responses.clear();
+      return result;
+    }
+
+    return std::deque<http::Response*>();
+  }
+
+  bool failed() const
+  {
+    return failure;
+  }
+
+private:
+  static int on_message_begin(http_parser* p)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK(!decoder->failure);
+
+    decoder->header = HEADER_FIELD;
+    decoder->field.clear();
+    decoder->value.clear();
+
+    CHECK(decoder->response == NULL);
+    CHECK(decoder->writer.isNone());
+
+    decoder->response = new http::Response();
+    decoder->response->type = http::Response::PIPE;
+    decoder->writer = None();
+
+    return 0;
+  }
+
+#if !(HTTP_PARSER_VERSION_MAJOR >= 2)
+  static int on_path(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+
+  static int on_query_string(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+
+  static int on_fragment(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+#endif // !(HTTP_PARSER_VERSION_MAJOR >= 2)
+
+  static int on_url(http_parser* p, const char* data, size_t length)
+  {
+    return 0;
+  }
+
+  static int on_header_field(http_parser* p, const char* data, size_t length)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_NOTNULL(decoder->response);
+
+    if (decoder->header != HEADER_FIELD) {
+      decoder->response->headers[decoder->field] = decoder->value;
+      decoder->field.clear();
+      decoder->value.clear();
+    }
+
+    decoder->field.append(data, length);
+    decoder->header = HEADER_FIELD;
+
+    return 0;
+  }
+
+  static int on_header_value(http_parser* p, const char* data, size_t length)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_NOTNULL(decoder->response);
+
+    decoder->value.append(data, length);
+    decoder->header = HEADER_VALUE;
+    return 0;
+  }
+
+  static int on_headers_complete(http_parser* p)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_NOTNULL(decoder->response);
+
+    // Add final header.
+    decoder->response->headers[decoder->field] = decoder->value;
+    decoder->field.clear();
+    decoder->value.clear();
+
+    // Get the response status string.
+    if (http::statuses.contains(decoder->parser.status_code)) {
+      decoder->response->status = http::statuses[decoder->parser.status_code];
+    } else {
+      decoder->failure = true;
+      return 1;
+    }
+
+    // We cannot provide streaming gzip decompression!
+    Option<std::string> encoding =
+      decoder->response->headers.get("Content-Encoding");
+    if (encoding.isSome() && encoding.get() == "gzip") {
+      decoder->failure = true;
+      return 1;
+    }
+
+    CHECK(decoder->writer.isNone());
+
+    http::Pipe pipe;
+    decoder->writer = pipe.writer();
+    decoder->response->reader = pipe.reader();
+
+    // Send the response to the caller, but keep a Pipe::Writer for
+    // streaming the body content into the response.
+    decoder->responses.push_back(decoder->response);
+    decoder->response = NULL;
+
+    return 0;
+  }
+
+  static int on_body(http_parser* p, const char* data, size_t length)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_SOME(decoder->writer);
+
+    http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+    writer.write(std::string(data, length));
+
+    return 0;
+  }
+
+  static int on_message_complete(http_parser* p)
+  {
+    StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+    CHECK_SOME(decoder->writer);
+
+    http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+    writer.close();
+
+    decoder->writer = None();
+
+    return 0;
+  }
+
+  bool failure;
+
+  http_parser parser;
+  http_parser_settings settings;
+
+  enum {
+    HEADER_FIELD,
+    HEADER_VALUE
+  } header;
+
+  std::string field;
+  std::string value;
+
+  http::Response* response;
+  Option<http::Pipe::Writer> writer;
+
+  std::deque<http::Response*> responses;
+};
+
 }  // namespace process {
 
 #endif // __DECODER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index b996f00..efe364a 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -133,3 +133,94 @@ TEST(Decoder, Response)
 
   delete response;
 }
+
+
+TEST(Decoder, StreamingResponse)
+{
+  StreamingResponseDecoder decoder;
+
+  const string& headers =
+    "HTTP/1.1 200 OK\r\n"
+    "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+    "Content-Type: text/plain\r\n"
+    "Content-Length: 2\r\n"
+    "\r\n";
+
+  const string& body = "hi";
+
+  deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+  ASSERT_FALSE(decoder.failed());
+  ASSERT_EQ(1, responses.size());
+
+  Response* response = responses[0];
+
+  EXPECT_EQ("200 OK", response->status);
+  EXPECT_EQ(3, response->headers.size());
+
+  ASSERT_EQ(Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  decoder.decode(body.data(), body.length());
+
+  // Feeding EOF to the decoder should be ok.
+  decoder.decode("", 0);
+
+  EXPECT_TRUE(read.isReady());
+  EXPECT_EQ("hi", read.get());
+
+  // Response should be complete.
+  read = reader.read();
+  EXPECT_TRUE(read.isReady());
+  EXPECT_EQ("", read.get());
+}
+
+
+TEST(Decoder, StreamingResponseFailure)
+{
+  StreamingResponseDecoder decoder;
+
+  const string& headers =
+    "HTTP/1.1 200 OK\r\n"
+    "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+    "Content-Type: text/plain\r\n"
+    "Content-Length: 2\r\n"
+    "\r\n";
+
+  // The body is shorter than the content length!
+  const string& body = "1";
+
+  deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+  ASSERT_FALSE(decoder.failed());
+  ASSERT_EQ(1, responses.size());
+
+  Response* response = responses[0];
+
+  EXPECT_EQ("200 OK", response->status);
+  EXPECT_EQ(3, response->headers.size());
+
+  ASSERT_EQ(Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  decoder.decode(body.data(), body.length());
+
+  EXPECT_TRUE(read.isReady());
+  EXPECT_EQ("1", read.get());
+
+  // Body is not yet complete.
+  read = reader.read();
+  EXPECT_TRUE(read.isPending());
+
+  // Feeding EOF to the decoder should trigger a failure!
+  decoder.decode("", 0);
+
+  EXPECT_TRUE(read.isFailed());
+  EXPECT_EQ("failed to decode body", read.failure());
+}