You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/03/31 02:12:44 UTC
[2/3] mesos git commit: Added a StreamingResponseDecoder.
Added a StreamingResponseDecoder.
Review: https://reviews.apache.org/r/32347
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6ac8eb1c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6ac8eb1c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6ac8eb1c
Branch: refs/heads/master
Commit: 6ac8eb1c524dc4adbc09d20dcb8e4e31d60eeb56
Parents: 8b0bba1
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Mar 20 11:44:51 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Mar 30 16:38:22 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/decoder.hpp | 232 +++++++++++++++++++
3rdparty/libprocess/src/tests/decoder_tests.cpp | 91 ++++++++
2 files changed, 323 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index b3a667c..56adde0 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -14,6 +14,7 @@
#include <stout/foreach.hpp>
#include <stout/gzip.hpp>
+#include <stout/option.hpp>
#include <stout/try.hpp>
@@ -469,6 +470,237 @@ private:
};
+// Provides a response decoder that returns 'PIPE' responses once
+// the response headers are received, but before the body data
+// is received. Callers are expected to read the body from the
+// Pipe::Reader in the response.
+//
+// TODO(bmahler): Consolidate streaming and non-streaming response
+// decoders.
+class StreamingResponseDecoder
+{
+public:
+ StreamingResponseDecoder()
+ : failure(false), header(HEADER_FIELD), response(NULL)
+ {
+ settings.on_message_begin =
+ &StreamingResponseDecoder::on_message_begin;
+
+#if !(HTTP_PARSER_VERSION_MAJOR >=2)
+ settings.on_path =
+ &StreamingResponseDecoder::on_path;
+ settings.on_fragment =
+ &StreamingResponseDecoder::on_fragment;
+ settings.on_query_string =
+ &StreamingResponseDecoder::on_query_string;
+#endif
+
+ settings.on_url =
+ &StreamingResponseDecoder::on_url;
+ settings.on_header_field =
+ &StreamingResponseDecoder::on_header_field;
+ settings.on_header_value =
+ &StreamingResponseDecoder::on_header_value;
+ settings.on_headers_complete =
+ &StreamingResponseDecoder::on_headers_complete;
+ settings.on_body =
+ &StreamingResponseDecoder::on_body;
+ settings.on_message_complete =
+ &StreamingResponseDecoder::on_message_complete;
+
+ http_parser_init(&parser, HTTP_RESPONSE);
+
+ parser.data = this;
+ }
+
+ std::deque<http::Response*> decode(const char* data, size_t length)
+ {
+ size_t parsed = http_parser_execute(&parser, &settings, data, length);
+
+ if (parsed != length) {
+ // TODO(bmahler): joyent/http-parser exposes error reasons.
+ failure = true;
+
+ // If we're still writing the body, fail the writer!
+ if (writer.isSome()) {
+ http::Pipe::Writer writer_ = writer.get(); // Remove const.
+ writer_.fail("failed to decode body");
+ writer = None();
+ }
+ }
+
+ if (!responses.empty()) {
+ std::deque<http::Response*> result = responses;
+ responses.clear();
+ return result;
+ }
+
+ return std::deque<http::Response*>();
+ }
+
+ bool failed() const
+ {
+ return failure;
+ }
+
+private:
+ static int on_message_begin(http_parser* p)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK(!decoder->failure);
+
+ decoder->header = HEADER_FIELD;
+ decoder->field.clear();
+ decoder->value.clear();
+
+ CHECK(decoder->response == NULL);
+ CHECK(decoder->writer.isNone());
+
+ decoder->response = new http::Response();
+ decoder->response->type = http::Response::PIPE;
+ decoder->writer = None();
+
+ return 0;
+ }
+
+#if !(HTTP_PARSER_VERSION_MAJOR >= 2)
+ static int on_path(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+
+ static int on_query_string(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+
+ static int on_fragment(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+#endif // !(HTTP_PARSER_VERSION_MAJOR >= 2)
+
+ static int on_url(http_parser* p, const char* data, size_t length)
+ {
+ return 0;
+ }
+
+ static int on_header_field(http_parser* p, const char* data, size_t length)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_NOTNULL(decoder->response);
+
+ if (decoder->header != HEADER_FIELD) {
+ decoder->response->headers[decoder->field] = decoder->value;
+ decoder->field.clear();
+ decoder->value.clear();
+ }
+
+ decoder->field.append(data, length);
+ decoder->header = HEADER_FIELD;
+
+ return 0;
+ }
+
+ static int on_header_value(http_parser* p, const char* data, size_t length)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_NOTNULL(decoder->response);
+
+ decoder->value.append(data, length);
+ decoder->header = HEADER_VALUE;
+ return 0;
+ }
+
+ static int on_headers_complete(http_parser* p)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_NOTNULL(decoder->response);
+
+ // Add final header.
+ decoder->response->headers[decoder->field] = decoder->value;
+ decoder->field.clear();
+ decoder->value.clear();
+
+ // Get the response status string.
+ if (http::statuses.contains(decoder->parser.status_code)) {
+ decoder->response->status = http::statuses[decoder->parser.status_code];
+ } else {
+ decoder->failure = true;
+ return 1;
+ }
+
+ // We cannot provide streaming gzip decompression!
+ Option<std::string> encoding =
+ decoder->response->headers.get("Content-Encoding");
+ if (encoding.isSome() && encoding.get() == "gzip") {
+ decoder->failure = true;
+ return 1;
+ }
+
+ CHECK(decoder->writer.isNone());
+
+ http::Pipe pipe;
+ decoder->writer = pipe.writer();
+ decoder->response->reader = pipe.reader();
+
+ // Send the response to the caller, but keep a Pipe::Writer for
+ // streaming the body content into the response.
+ decoder->responses.push_back(decoder->response);
+ decoder->response = NULL;
+
+ return 0;
+ }
+
+ static int on_body(http_parser* p, const char* data, size_t length)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_SOME(decoder->writer);
+
+ http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+ writer.write(std::string(data, length));
+
+ return 0;
+ }
+
+ static int on_message_complete(http_parser* p)
+ {
+ StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data;
+
+ CHECK_SOME(decoder->writer);
+
+ http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
+ writer.close();
+
+ decoder->writer = None();
+
+ return 0;
+ }
+
+ bool failure;
+
+ http_parser parser;
+ http_parser_settings settings;
+
+ enum {
+ HEADER_FIELD,
+ HEADER_VALUE
+ } header;
+
+ std::string field;
+ std::string value;
+
+ http::Response* response;
+ Option<http::Pipe::Writer> writer;
+
+ std::deque<http::Response*> responses;
+};
+
} // namespace process {
#endif // __DECODER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index b996f00..efe364a 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -133,3 +133,94 @@ TEST(Decoder, Response)
delete response;
}
+
+
+TEST(Decoder, StreamingResponse)
+{
+ StreamingResponseDecoder decoder;
+
+ const string& headers =
+ "HTTP/1.1 200 OK\r\n"
+ "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Length: 2\r\n"
+ "\r\n";
+
+ const string& body = "hi";
+
+ deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+ ASSERT_FALSE(decoder.failed());
+ ASSERT_EQ(1, responses.size());
+
+ Response* response = responses[0];
+
+ EXPECT_EQ("200 OK", response->status);
+ EXPECT_EQ(3, response->headers.size());
+
+ ASSERT_EQ(Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ decoder.decode(body.data(), body.length());
+
+ // Feeding EOF to the decoder should be ok.
+ decoder.decode("", 0);
+
+ EXPECT_TRUE(read.isReady());
+ EXPECT_EQ("hi", read.get());
+
+ // Response should be complete.
+ read = reader.read();
+ EXPECT_TRUE(read.isReady());
+ EXPECT_EQ("", read.get());
+}
+
+
+TEST(Decoder, StreamingResponseFailure)
+{
+ StreamingResponseDecoder decoder;
+
+ const string& headers =
+ "HTTP/1.1 200 OK\r\n"
+ "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n"
+ "Content-Type: text/plain\r\n"
+ "Content-Length: 2\r\n"
+ "\r\n";
+
+ // The body is shorter than the content length!
+ const string& body = "1";
+
+ deque<Response*> responses = decoder.decode(headers.data(), headers.length());
+ ASSERT_FALSE(decoder.failed());
+ ASSERT_EQ(1, responses.size());
+
+ Response* response = responses[0];
+
+ EXPECT_EQ("200 OK", response->status);
+ EXPECT_EQ(3, response->headers.size());
+
+ ASSERT_EQ(Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+ Future<string> read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ decoder.decode(body.data(), body.length());
+
+ EXPECT_TRUE(read.isReady());
+ EXPECT_EQ("1", read.get());
+
+ // Body is not yet complete.
+ read = reader.read();
+ EXPECT_TRUE(read.isPending());
+
+ // Feeding EOF to the decoder should trigger a failure!
+ decoder.decode("", 0);
+
+ EXPECT_TRUE(read.isFailed());
+ EXPECT_EQ("failed to decode body", read.failure());
+}