You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/12/04 17:15:18 UTC

[mesos] branch master updated (3dda362 -> d9fe311)

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 3dda362  Simplified V0 -> V1 direct serialization logic.
     new 80cec58  Updated stout recordio encoder/decoder to be lower-level.
     new ed39b42  Updated mesos code to work against recordio encoder/decoder changes.
     new a15e5a3  Updated StreamingHttpConnection to allow writing serialized records.
     new d9fe311  Improved operator api subscribe initial payload performance.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 3rdparty/stout/include/stout/recordio.hpp        |  49 +++-----
 3rdparty/stout/tests/recordio_tests.cpp          |  50 +++------
 src/checks/checker_process.cpp                   |  25 ++---
 src/common/http.hpp                              |  19 +++-
 src/common/recordio.hpp                          |  21 ++--
 src/executor/executor.cpp                        |   8 +-
 src/master/http.cpp                              | 136 +++++++++++++++++++++--
 src/master/master.hpp                            |   5 +
 src/resource_provider/http_connection.hpp        |   5 +-
 src/resource_provider/manager.cpp                |  48 ++------
 src/scheduler/scheduler.cpp                      |   8 +-
 src/slave/containerizer/mesos/io/switchboard.cpp |  15 ++-
 src/slave/http.cpp                               |  25 ++---
 src/tests/api_tests.cpp                          | 111 ++++++------------
 src/tests/common/recordio_tests.cpp              |  52 +++------
 src/tests/containerizer/io_switchboard_tests.cpp |  39 +++----
 src/tests/executor_http_api_tests.cpp            |  17 +--
 src/tests/master/mock_master_api_subscriber.cpp  |   2 +-
 src/tests/resource_provider_manager_tests.cpp    |  15 +--
 src/tests/scheduler_http_api_tests.cpp           |  22 ++--
 20 files changed, 329 insertions(+), 343 deletions(-)


[mesos] 01/04: Updated stout recordio encoder/decoder to be lower-level.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 80cec581102f94d0d4630758d68e87c9605a9cba
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Nov 25 17:25:43 2019 -0500

    Updated stout recordio encoder/decoder to be lower-level.
    
    These previously were templated to operate against typed records,
    which imposes a few limitations. Namely, the caller cannot encode
    records of different types without using some wrapper type.
    Similarly, on the decoding side if the caller expects different
    types of records based on some higher level protocol, it needs to
    use a wrapper type that just stores the record's bytes for the
    caller to decode.
    
    The actual motivation for this patch came from a case where the
    caller already has the record in bytes when encoding. We could
    use Encoder<string> but it imposes an extra copy of each record,
    which is not so straightforward to eliminate while keeping the
    interface simple and functional for generic T records.
    
    So, this patch makes the recordio encoder/decoder operate on
    records as bytes, and callers can do whatever they wish with
    the bytes.
    
    Review: https://reviews.apache.org/r/71824
---
 3rdparty/stout/include/stout/recordio.hpp | 49 ++++++++----------------------
 3rdparty/stout/tests/recordio_tests.cpp   | 50 +++++++++----------------------
 2 files changed, 27 insertions(+), 72 deletions(-)

diff --git a/3rdparty/stout/include/stout/recordio.hpp b/3rdparty/stout/include/stout/recordio.hpp
index 9d226c2..4dd0a87 100644
--- a/3rdparty/stout/include/stout/recordio.hpp
+++ b/3rdparty/stout/include/stout/recordio.hpp
@@ -44,11 +44,6 @@
  * other "Record-IO" implementations use a fixed-size header
  * of 4 bytes to directly encode an unsigned 32 bit length.
  *
- * TODO(bmahler): Make the encoder and decoder non-templated.
- * They can just take records as bytes and let a wrapper at
- * a higher level do what they like with the bytes of each
- * record.
- *
  * TODO(bmahler): Make the encoder and decoder zero-copy,
  * once they're just dealing with bytes. To make the encoder
  * zero-copy, we need to make "writes" directly to an output
@@ -60,40 +55,24 @@
 namespace recordio {
 
 /**
- * Given an encoding function for individual records, this
- * provides encoding from typed records into "Record-IO" data.
+ * Returns the "Record-IO" encoded record. Unlike the
+ * decoder, this can just be a stateless function since
+ * we're taking entire records and each encoded record
+ * is independent.
  */
-template <typename T>
-class Encoder
+inline std::string encode(const std::string& record)\
 {
-public:
-  Encoder(std::function<std::string(const T&)> _serialize)
-    : serialize(_serialize) {}
-
-  /**
-   * Returns the "Record-IO" encoded record.
-   */
-  std::string encode(const T& record) const
-  {
-    std::string s = serialize(record);
-    return stringify(s.size()) + "\n" + s;
-  }
-
-private:
-  std::function<std::string(const T&)> serialize;
-};
+  return stringify(record.size()) + "\n" + record;
+}
 
 
 /**
- * Given a decoding function for individual records, this
- * provides decoding from "Record-IO" data into typed records.
+ * Decodes records from "Record-IO" data (see above).
  */
-template <typename T>
 class Decoder
 {
 public:
-  Decoder(std::function<Try<T>(const std::string&)> _deserialize)
-    : state(HEADER), deserialize(_deserialize) {}
+  Decoder() : state(HEADER) {}
 
   /**
    * Decodes another chunk of data from the "Record-IO" stream
@@ -107,13 +86,13 @@ public:
    * TODO(bmahler): Allow the caller to signal EOF, this allows
    * detection of invalid partial data at the end of the input.
    */
-  Try<std::deque<Try<T>>> decode(const std::string& data)
+  Try<std::deque<std::string>> decode(const std::string& data)
   {
     if (state == FAILED) {
       return Error("Decoder is in a FAILED state");
     }
 
-    std::deque<Try<T>> records;
+    std::deque<std::string> records;
 
     foreach (char c, data) {
       if (state == HEADER) {
@@ -140,7 +119,7 @@ public:
 
         // Note that for 0 length records, we immediately decode.
         if (numify.get() <= 0) {
-          records.push_back(deserialize(buffer));
+          records.push_back(buffer);
           state = HEADER;
         }
       } else if (state == RECORD) {
@@ -150,7 +129,7 @@ public:
         buffer += c;
 
         if (buffer.size() == length.get()) {
-          records.push_back(deserialize(buffer));
+          records.push_back(std::move(buffer));
           buffer.clear();
           state = HEADER;
         }
@@ -172,8 +151,6 @@ private:
   // its underlying memory allocation when we clear it.
   std::string buffer;
   Option<size_t> length;
-
-  std::function<Try<T>(const std::string&)> deserialize;
 };
 
 } // namespace recordio {
diff --git a/3rdparty/stout/tests/recordio_tests.cpp b/3rdparty/stout/tests/recordio_tests.cpp
index 1779394..8818b0d 100644
--- a/3rdparty/stout/tests/recordio_tests.cpp
+++ b/3rdparty/stout/tests/recordio_tests.cpp
@@ -71,31 +71,25 @@ bool operator==(deque<T> rhs, deque<T> lhs)
 }
 
 
-TEST(RecordIOTest, Encoder)
+TEST(RecordIOTest, Encode)
 {
-  recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello!");
-  data += encoder.encode("");
-  data += encoder.encode(" ");
-  data += encoder.encode("13 characters");
+  data += recordio::encode("hello!");
+  data += recordio::encode("");
+  data += recordio::encode(" ");
+  data += recordio::encode("13 characters");
 
   EXPECT_EQ(
-      "6\nHELLO!"
+      "6\nhello!"
       "0\n"
       "1\n "
-      "13\n13 CHARACTERS",
+      "13\n13 characters",
       data);
 
-  // Make sure these can be decoded.
-  recordio::Decoder<string> decoder(
-      [=](const string& data) {
-        return Try<string>(strings::lower(data));
-      });
+  recordio::Decoder decoder;
 
-  deque<Try<string>> records;
+  deque<string> records;
   records.push_back("hello!");
   records.push_back("");
   records.push_back(" ");
@@ -107,18 +101,9 @@ TEST(RecordIOTest, Encoder)
 
 TEST(RecordIOTest, Decoder)
 {
-  // Deserializing brings to lower case, but add an
-  // error case to test deserialization failures.
-  auto deserialize = [](const string& data) -> Try<string> {
-    if (data == "error") {
-      return Error("error");
-    }
-    return strings::lower(data);
-  };
-
-  recordio::Decoder<string> decoder(deserialize);
+  recordio::Decoder decoder;
 
-  deque<Try<string>> records;
+  deque<string> records;
 
   // Empty data should not result in an error.
   records.clear();
@@ -131,14 +116,7 @@ TEST(RecordIOTest, Decoder)
   records.push_back("");
   records.push_back(" ");
 
-  EXPECT_SOME_EQ(records, decoder.decode("6\nHELLO!0\n1\n "));
-
-  // An entry which cannot be decoded should not
-  // fail the decoder permanently.
-  records.clear();
-  records.push_back(Error("error"));
-
-  EXPECT_SOME_EQ(records, decoder.decode("5\nerror"));
+  EXPECT_SOME_EQ(records, decoder.decode("6\nhello!0\n1\n "));
 
   // Record should only be decoded once complete.
   records.clear();
@@ -146,12 +124,12 @@ TEST(RecordIOTest, Decoder)
   EXPECT_SOME_EQ(records, decoder.decode("1"));
   EXPECT_SOME_EQ(records, decoder.decode("3"));
   EXPECT_SOME_EQ(records, decoder.decode("\n"));
-  EXPECT_SOME_EQ(records, decoder.decode("13 CHARACTER"));
+  EXPECT_SOME_EQ(records, decoder.decode("13 character"));
 
   records.clear();
   records.push_back("13 characters");
 
-  EXPECT_SOME_EQ(records, decoder.decode("S"));
+  EXPECT_SOME_EQ(records, decoder.decode("s"));
 
   // If the format is bad, the decoder should fail permanently.
   EXPECT_ERROR(decoder.decode("not a number\n"));


[mesos] 03/04: Updated StreamingHttpConnection to allow writing serialized records.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a15e5a3b5f5620b933ab709839a5ac5714e5c56c
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Nov 25 17:43:18 2019 -0500

    Updated StreamingHttpConnection to allow writing serialized records.
    
    In order to more efficiently write the initial payload of the
    master's operator API event stream, we will need to directly
    serialize from the in-memory master state. To enable this, this
    patch updates the streaming http connection to allow writing an
    already serialized record.
    
    Review: https://reviews.apache.org/r/71826
---
 src/common/http.hpp | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/src/common/http.hpp b/src/common/http.hpp
index 534fc26..47a4d6a 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -168,6 +168,12 @@ struct StreamingHttpConnection
     return writer.write(::recordio::encode(record));
   }
 
+  // Like the above send, but for already serialized data.
+  bool send(const std::string& event)
+  {
+    return writer.write(::recordio::encode(event));
+  }
+
   bool close()
   {
     return writer.close();


[mesos] 04/04: Improved operator api subscribe initial payload performance.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d9fe31158b54238e1621a668189d34a793dacd5e
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Nov 25 18:44:21 2019 -0500

    Improved operator api subscribe initial payload performance.
    
    This uses the same approach for other GET_ calls in MESOS-10026
    of directly serializing from in-memory state, rather than building
    up the temporary object and evolving it.
    
    There is currently no benchmark but the improvement should closely
    resemble that of the GET_STATE call, for example:
    
        Before:
        v0 '/state' response took 6.55 secs
        v1 'GetState' application/x-protobuf response took 24.08 secs
        v1 'GetState' application/json response took 22.76 secs
    
        After:
        v0 '/state' response took 8.00 secs
        v1 'GetState' application/x-protobuf response took 5.73 secs
        v1 'GetState' application/json response took 9.62 secs
    
    Review: https://reviews.apache.org/r/71827
---
 src/master/http.cpp   | 136 +++++++++++++++++++++++++++++++++++++++++++++++---
 src/master/master.hpp |   5 ++
 2 files changed, 134 insertions(+), 7 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6d84856..72587bf 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -438,15 +438,70 @@ Future<Response> Master::Http::subscribe(
           StreamingHttpConnection<v1::master::Event> http(
               pipe.writer(), contentType);
 
-          mesos::master::Event event;
-          event.set_type(mesos::master::Event::SUBSCRIBED);
-          *event.mutable_subscribed()->mutable_get_state() =
-            _getState(approvers);
+          // Serialize the following event:
+          //
+          //   mesos::master::Event event;
+          //   event.set_type(mesos::master::Event::SUBSCRIBED);
+          //   *event.mutable_subscribed()->mutable_get_state() =
+          //     _getState(approvers);
+          //   event.mutable_subscribed()->set_heartbeat_interval_seconds(
+          //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+          //
+          //   http.send(event);
+
+          switch (contentType) {
+            case ContentType::PROTOBUF: {
+              string serialized;
+              google::protobuf::io::StringOutputStream stream(&serialized);
+              google::protobuf::io::CodedOutputStream writer(&stream);
+
+              WireFormatLite::WriteEnum(
+                  mesos::v1::master::Event::kTypeFieldNumber,
+                  mesos::v1::master::Event::SUBSCRIBED,
+                  &writer);
+
+              WireFormatLite::WriteBytes(
+                  mesos::v1::master::Event::kSubscribedFieldNumber,
+                  serializeSubscribe(approvers),
+                  &writer);
+
+              // We must manually trim the unused buffer space since
+              // we use the string before the coded output stream is
+              // destructed.
+              writer.Trim();
+
+              http.send(serialized);
+
+              break;
+            }
+
+            case ContentType::JSON: {
+              string serialized = jsonify([&](JSON::ObjectWriter* writer) {
+                const google::protobuf::Descriptor* descriptor =
+                  v1::master::Event::descriptor();
 
-          event.mutable_subscribed()->set_heartbeat_interval_seconds(
-              DEFAULT_HEARTBEAT_INTERVAL.secs());
+                int field;
 
-          http.send(event);
+                field = v1::master::Event::kTypeFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    v1::master::Event::Type_Name(
+                        v1::master::Event::SUBSCRIBED));
+
+                field = v1::master::Event::kSubscribedFieldNumber;
+                writer->field(
+                    descriptor->FindFieldByNumber(field)->name(),
+                    jsonifySubscribe(master, approvers));
+              });
+
+              http.send(serialized);
+
+              break;
+            }
+
+            default:
+              return NotAcceptable("Request must accept json or protobuf");
+          }
 
           mesos::master::Event heartbeatEvent;
           heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
@@ -461,6 +516,73 @@ Future<Response> Master::Http::subscribe(
 }
 
 
+function<void(JSON::ObjectWriter*)> Master::Http::jsonifySubscribe(
+    const Master* master,
+    const Owned<ObjectApprovers>& approvers)
+{
+  // Jsonify the following message:
+  //
+  //   mesos::master::Event::Subscribed subscribed;
+  //   *subscribed.mutable_get_state() = _getState(approvers);
+  //   subscribed.set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Event::Subscribed::descriptor();
+
+    int field;
+
+    field = v1::master::Event::Subscribed::kGetStateFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetState(master, approvers));
+
+    field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        DEFAULT_HEARTBEAT_INTERVAL.secs());
+  };
+}
+
+
+string Master::Http::serializeSubscribe(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Event::Subscribed subscribed;
+  //   *subscribed.mutable_get_state() = _getState(approvers);
+  //   subscribed.set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
+      serializeGetState(approvers),
+      &writer);
+
+  WireFormatLite::WriteDouble(
+      mesos::v1::master::Event::Subscribed
+        ::kHeartbeatIntervalSecondsFieldNumber,
+      DEFAULT_HEARTBEAT_INTERVAL.secs(),
+      &writer);
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
+}
+
+
 // TODO(ijimenez): Add some information or pointers to help
 // users understand the HTTP Event/Call API.
 string Master::Http::SCHEDULER_HELP()
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9363042..f97b085 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1959,6 +1959,11 @@ private:
     mesos::master::Response::GetState _getState(
         const process::Owned<ObjectApprovers>& approvers) const;
 
+    static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
+        const Master* master,
+        const process::Owned<ObjectApprovers>& approvers);
+    std::string serializeSubscribe(
+        const process::Owned<ObjectApprovers>& approvers) const;
     process::Future<process::http::Response> subscribe(
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,


[mesos] 02/04: Updated mesos code to work against recordio encoder/decoder changes.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ed39b42034254f6c2e8da65dfea3a35e4d6ddce3
Author: Benjamin Mahler <bm...@apache.org>
AuthorDate: Mon Nov 25 17:37:36 2019 -0500

    Updated mesos code to work against recordio encoder/decoder changes.
    
    The recordio encoder and decoder were updated to operate on records
    as bytes instead of typed T records.
    
    Review: https://reviews.apache.org/r/71825
---
 src/checks/checker_process.cpp                   |  25 +++--
 src/common/http.hpp                              |  13 ++-
 src/common/recordio.hpp                          |  21 +++--
 src/executor/executor.cpp                        |   8 +-
 src/resource_provider/http_connection.hpp        |   5 +-
 src/resource_provider/manager.cpp                |  48 ++--------
 src/scheduler/scheduler.cpp                      |   8 +-
 src/slave/containerizer/mesos/io/switchboard.cpp |  15 +--
 src/slave/http.cpp                               |  25 +++--
 src/tests/api_tests.cpp                          | 111 ++++++++---------------
 src/tests/common/recordio_tests.cpp              |  52 ++++-------
 src/tests/containerizer/io_switchboard_tests.cpp |  39 ++++----
 src/tests/executor_http_api_tests.cpp            |  17 ++--
 src/tests/master/mock_master_api_subscriber.cpp  |   2 +-
 src/tests/resource_provider_manager_tests.cpp    |  15 +--
 src/tests/scheduler_http_api_tests.cpp           |  22 ++---
 16 files changed, 162 insertions(+), 264 deletions(-)

diff --git a/src/checks/checker_process.cpp b/src/checks/checker_process.cpp
index c214bd1..6a9c7fc 100644
--- a/src/checks/checker_process.cpp
+++ b/src/checks/checker_process.cpp
@@ -171,30 +171,29 @@ static Try<tuple<string, string>> decodeProcessIOData(const string& data)
   string stdoutReceived;
   string stderrReceived;
 
-  ::recordio::Decoder<v1::agent::ProcessIO> decoder(
-      lambda::bind(
-          deserialize<v1::agent::ProcessIO>,
-          ContentType::PROTOBUF,
-          lambda::_1));
+  ::recordio::Decoder decoder;
 
-  Try<std::deque<Try<v1::agent::ProcessIO>>> records = decoder.decode(data);
+  Try<std::deque<string>> records = decoder.decode(data);
 
   if (records.isError()) {
     return Error(records.error());
   }
 
   while (!records->empty()) {
-    Try<v1::agent::ProcessIO> record = records->front();
+    string record = std::move(records->front());
     records->pop_front();
 
-    if (record.isError()) {
-      return Error(record.error());
+    Try<v1::agent::ProcessIO> result = deserialize<v1::agent::ProcessIO>(
+        ContentType::PROTOBUF, record);
+
+    if (result.isError()) {
+      return Error(result.error());
     }
 
-    if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
-      stdoutReceived += record->data().data();
-    } else if (record->data().type() == v1::agent::ProcessIO::Data::STDERR) {
-      stderrReceived += record->data().data();
+    if (result->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
+      stdoutReceived += result->data().data();
+    } else if (result->data().type() == v1::agent::ProcessIO::Data::STDERR) {
+      stderrReceived += result->data().data();
     }
   }
 
diff --git a/src/common/http.hpp b/src/common/http.hpp
index b9ab561..534fc26 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -153,15 +153,19 @@ struct StreamingHttpConnection
       id::UUID _streamId = id::UUID::random())
     : writer(_writer),
       contentType(_contentType),
-      encoder(lambda::bind(serialize, contentType, lambda::_1)),
       streamId(_streamId) {}
 
-  // Converts the message to the templated `Event`, via `evolve()`,
-  // before sending.
   template <typename Message>
   bool send(const Message& message)
   {
-    return writer.write(encoder.encode(evolve(message)));
+    // TODO(bmahler): Remove this evolve(). Could we still
+    // somehow assert that evolve(message) produces a result
+    // of type Event without calling evolve()?
+    Event e = evolve(message);
+
+    std::string record = serialize(contentType, e);
+
+    return writer.write(::recordio::encode(record));
   }
 
   bool close()
@@ -176,7 +180,6 @@ struct StreamingHttpConnection
 
   process::http::Pipe::Writer writer;
   ContentType contentType;
-  ::recordio::Encoder<Event> encoder;
   id::UUID streamId;
 };
 
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 8cb2e73..ee91a60 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -65,10 +65,10 @@ public:
   // We spawn `ReaderProcess` as a managed process to guarantee
   // that it does not wait on itself (this would cause a deadlock!).
   // See comments in `Connection::Data` for further details.
-  Reader(::recordio::Decoder<T>&& decoder,
+  Reader(std::function<Try<T>(const std::string&)> deserialize,
          process::http::Pipe::Reader reader)
     : process(process::spawn(
-        new internal::ReaderProcess<T>(std::move(decoder), reader),
+        new internal::ReaderProcess<T>(std::move(deserialize), reader),
         true)) {}
 
   virtual ~Reader()
@@ -149,10 +149,10 @@ class ReaderProcess : public process::Process<ReaderProcess<T>>
 {
 public:
   ReaderProcess(
-      ::recordio::Decoder<T>&& _decoder,
+      std::function<Try<T>(const std::string&)>&& _deserialize,
       process::http::Pipe::Reader _reader)
     : process::ProcessBase(process::ID::generate("__reader__")),
-      decoder(_decoder),
+      deserialize(_deserialize),
       reader(_reader),
       done(false) {}
 
@@ -235,26 +235,29 @@ private:
       return;
     }
 
-    Try<std::deque<Try<T>>> decode = decoder.decode(read.get());
+    Try<std::deque<std::string>> decode = decoder.decode(read.get());
 
     if (decode.isError()) {
       fail("Decoder failure: " + decode.error());
       return;
     }
 
-    foreach (const Try<T>& record, decode.get()) {
+    foreach (const std::string& record, decode.get()) {
+      Result<T> t = deserialize(record);
+
       if (!waiters.empty()) {
-        waiters.front()->set(Result<T>(std::move(record)));
+        waiters.front()->set(std::move(t));
         waiters.pop();
       } else {
-        records.push(std::move(record));
+        records.push(std::move(t));
       }
     }
 
     consume();
   }
 
-  ::recordio::Decoder<T> decoder;
+  ::recordio::Decoder decoder;
+  std::function<Try<T>(const std::string&)> deserialize;
   process::http::Pipe::Reader reader;
 
   std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index b412603..dfa5820 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -634,11 +634,9 @@ protected:
 
       Pipe::Reader reader = response->reader.get();
 
-      auto deserializer =
-        lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
-      Owned<Reader<Event>> decoder(
-          new Reader<Event>(Decoder<Event>(deserializer), reader));
+      Owned<Reader<Event>> decoder(new Reader<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1),
+          reader));
 
       subscribed = SubscribedResponse {reader, decoder};
 
diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp
index 05863aa..84a2610 100644
--- a/src/resource_provider/http_connection.hpp
+++ b/src/resource_provider/http_connection.hpp
@@ -368,12 +368,9 @@ protected:
 
       process::http::Pipe::Reader reader = response.reader.get();
 
-      auto deserializer =
-        lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
       process::Owned<recordio::Reader<Event>> decoder(
           new recordio::Reader<Event>(
-              ::recordio::Decoder<Event>(deserializer),
+              lambda::bind(deserialize<Event>, contentType, lambda::_1),
               reader));
 
       subscribed = SubscribedResponse(reader, std::move(decoder));
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 427ce70..5665167 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -113,48 +113,12 @@ createRegistryResourceProvider(const ResourceProviderInfo& resourceProviderInfo)
   return resourceProvider;
 }
 
-// Represents the streaming HTTP connection to a resource provider.
-struct HttpConnection
-{
-  HttpConnection(const http::Pipe::Writer& _writer,
-                 ContentType _contentType,
-                 id::UUID _streamId)
-    : writer(_writer),
-      contentType(_contentType),
-      streamId(_streamId),
-      encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
-
-  // Converts the message to an Event before sending.
-  template <typename Message>
-  bool send(const Message& message)
-  {
-    // We need to evolve the internal 'message' into a
-    // 'v1::resource_provider::Event'.
-    return writer.write(encoder.encode(evolve(message)));
-  }
-
-  bool close()
-  {
-    return writer.close();
-  }
-
-  Future<Nothing> closed() const
-  {
-    return writer.readerClosed();
-  }
-
-  http::Pipe::Writer writer;
-  ContentType contentType;
-  id::UUID streamId;
-  ::recordio::Encoder<v1::resource_provider::Event> encoder;
-};
-
 
 struct ResourceProvider
 {
   ResourceProvider(
       const ResourceProviderInfo& _info,
-      const HttpConnection& _http)
+      const StreamingHttpConnection<v1::resource_provider::Event>& _http)
     : info(_info),
       http(_http) {}
 
@@ -172,7 +136,7 @@ struct ResourceProvider
   }
 
   ResourceProviderInfo info;
-  HttpConnection http;
+  StreamingHttpConnection<v1::resource_provider::Event> http;
   hashmap<id::UUID, Owned<Promise<Nothing>>> publishes;
 };
 
@@ -203,7 +167,7 @@ public:
 
 private:
   void subscribe(
-      const HttpConnection& http,
+      const StreamingHttpConnection<v1::resource_provider::Event>& http,
       const Call::Subscribe& subscribe);
 
   void _subscribe(
@@ -394,7 +358,9 @@ Future<http::Response> ResourceProviderManagerProcess::api(
           id::UUID streamId = id::UUID::random();
           ok.headers["Mesos-Stream-Id"] = streamId.toString();
 
-          HttpConnection http(pipe.writer(), acceptType, streamId);
+          StreamingHttpConnection<v1::resource_provider::Event> http(
+              pipe.writer(), acceptType, streamId);
+
           this->subscribe(http, call.subscribe());
 
           return std::move(ok);
@@ -804,7 +770,7 @@ Future<Nothing> ResourceProviderManagerProcess::publishResources(
 
 
 void ResourceProviderManagerProcess::subscribe(
-    const HttpConnection& http,
+    const StreamingHttpConnection<v1::resource_provider::Event>& http,
     const Call::Subscribe& subscribe)
 {
   const ResourceProviderInfo& resourceProviderInfo =
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 674483a..48be291 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -655,11 +655,9 @@ protected:
 
       Pipe::Reader reader = response->reader.get();
 
-      auto deserializer =
-        lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
-      Owned<Reader<Event>> decoder(
-          new Reader<Event>(Decoder<Event>(deserializer), reader));
+      Owned<Reader<Event>> decoder(new Reader<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1),
+          reader));
 
       subscribed = SubscribedResponse {reader, decoder};
 
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp b/src/slave/containerizer/mesos/io/switchboard.cpp
index 8e02e51..8d78f7c 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -977,18 +977,21 @@ public:
   Future<Nothing> unblock();
 
 private:
+  // TODO(bmahler): Replace this with the common StreamingHttpConnection.
   class HttpConnection
   {
   public:
     HttpConnection(
         const http::Pipe::Writer& _writer,
-        const ContentType& contentType)
+        const ContentType& _contentType)
       : writer(_writer),
-        encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+        contentType(_contentType) {}
 
     bool send(const agent::ProcessIO& message)
     {
-      return writer.write(encoder.encode(message));
+      string record = serialize(contentType, message);
+
+      return writer.write(::recordio::encode(record));
     }
 
     bool close()
@@ -1003,7 +1006,7 @@ private:
 
   private:
     http::Pipe::Writer writer;
-    ::recordio::Encoder<agent::ProcessIO> encoder;
+    ContentType contentType;
   };
 
   // Sit in a heartbeat loop forever.
@@ -1483,10 +1486,10 @@ Future<http::Response> IOSwitchboardServerProcess::handler(
 
     Owned<recordio::Reader<agent::Call>> reader(
         new recordio::Reader<agent::Call>(
-            ::recordio::Decoder<agent::Call>(lambda::bind(
+            lambda::bind(
                 deserialize<agent::Call>,
                 messageContentType.get(),
-                lambda::_1)),
+                lambda::_1),
             request.reader.get()));
 
     return reader->read()
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4d68ce7..04ad0d8 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -498,8 +498,8 @@ Future<Response> Http::api(
     CHECK_SOME(mediaTypes.messageContent);
 
     Owned<Reader<mesos::agent::Call>> reader(new Reader<mesos::agent::Call>(
-        Decoder<mesos::agent::Call>(lambda::bind(
-            deserializer, lambda::_1, mediaTypes.messageContent.get())),
+        lambda::bind(
+            deserializer, lambda::_1, mediaTypes.messageContent.get()),
         request.reader.get()));
 
     return reader->read()
@@ -3191,10 +3191,8 @@ Future<Response> Http::_attachContainerInput(
 
   CHECK_SOME(mediaTypes.messageContent);
   auto encoder = [mediaTypes](const mesos::agent::Call& call) {
-    ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
-        serialize, mediaTypes.messageContent.get(), lambda::_1));
-
-    return encoder.encode(call);
+    string record = serialize(mediaTypes.messageContent.get(), call);
+    return ::recordio::encode(record);
   };
 
   // Write the first record. We had extracted it from the `decoder`
@@ -3728,17 +3726,18 @@ Future<Response> Http::_attachContainerOutput(
           CHECK_SOME(response.reader);
           Pipe::Reader reader = response.reader.get();
 
-          auto deserializer = lambda::bind(
-              deserialize<ProcessIO>, messageContentType, lambda::_1);
-
           Owned<Reader<ProcessIO>> decoder(new Reader<ProcessIO>(
-              Decoder<ProcessIO>(deserializer), reader));
+              lambda::bind(
+                  deserialize<ProcessIO>,
+                  messageContentType,
+                  lambda::_1),
+              reader));
 
           auto encoder = [messageContentType](const ProcessIO& processIO) {
-            ::recordio::Encoder<v1::agent::ProcessIO> encoder (lambda::bind(
-                serialize, messageContentType, lambda::_1));
+            v1::agent::ProcessIO evolved = evolve(processIO);
+            string record = serialize(messageContentType, evolved);
 
-            return encoder.encode(evolve(processIO));
+            return ::recordio::encode(record);
           };
 
           recordio::transform<ProcessIO>(std::move(decoder), encoder, writer)
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 393f9a2..8755016 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3073,8 +3073,7 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   {
     Future<Result<v1::master::Event>> event = decoder.read();
@@ -3334,8 +3333,7 @@ TEST_P(MasterAPITest, EventAuthorizationDelayed)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   Future<Result<v1::master::Event>> event = decoder.read();
   AWAIT_READY(event);
@@ -3525,8 +3523,7 @@ TEST_P(MasterAPITest, FrameworksEvent)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   Future<Result<v1::master::Event>> event = decoder.read();
   AWAIT_READY(event);
@@ -3697,8 +3694,7 @@ TEST_P(MasterAPITest, Heartbeat)
   auto deserializer =
     lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Reader<v1::master::Event> decoder(deserializer, reader);
 
   Future<Result<v1::master::Event>> event = decoder.read();
   AWAIT_READY(event);
@@ -3785,8 +3781,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
   ASSERT_SOME(response1->reader);
   http::Pipe::Reader reader1 = response1->reader.get();
 
-  Reader<v1::master::Event> decoder1(
-      Decoder<v1::master::Event>(deserializer), reader1);
+  Reader<v1::master::Event> decoder1(deserializer, reader1);
 
   event = decoder1.read();
   AWAIT_READY(event);
@@ -3807,8 +3802,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
   ASSERT_SOME(response2->reader);
   http::Pipe::Reader reader2 = response2->reader.get();
 
-  Reader<v1::master::Event> decoder2(
-      Decoder<v1::master::Event>(deserializer), reader2);
+  Reader<v1::master::Event> decoder2(deserializer, reader2);
 
   event = decoder2.read();
   AWAIT_READY(event);
@@ -3854,8 +3848,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
     ASSERT_SOME(response3->reader);
     http::Pipe::Reader reader3 = response3->reader.get();
 
-    Reader<v1::master::Event> decoder3(
-        Decoder<v1::master::Event>(deserializer), reader3);
+    Reader<v1::master::Event> decoder3(deserializer, reader3);
 
     event = decoder3.read();
     AWAIT_READY(event);
@@ -3898,8 +3891,7 @@ TEST_P(MasterAPITest, MaxEventStreamSubscribers)
   ASSERT_SOME(response4->reader);
   http::Pipe::Reader reader4 = response4->reader.get();
 
-  Reader<v1::master::Event> decoder4(
-      Decoder<v1::master::Event>(deserializer), reader4);
+  Reader<v1::master::Event> decoder4(deserializer, reader4);
 
   event = decoder4.read();
   AWAIT_READY(event);
@@ -5675,29 +5667,30 @@ static Future<tuple<string, string>> getProcessIOData(
       string stdoutReceived;
       string stderrReceived;
 
-      ::recordio::Decoder<v1::agent::ProcessIO> decoder(lambda::bind(
-          deserialize<v1::agent::ProcessIO>, contentType, lambda::_1));
+      ::recordio::Decoder decoder;
 
-      Try<std::deque<Try<v1::agent::ProcessIO>>> records =
-        decoder.decode(data);
+      Try<std::deque<string>> records = decoder.decode(data);
 
       if (records.isError()) {
         return process::Failure(records.error());
       }
 
       while(!records->empty()) {
-        Try<v1::agent::ProcessIO> record = records->front();
+        string record = std::move(records->front());
         records->pop_front();
 
-        if (record.isError()) {
-          return process::Failure(record.error());
+        Try<v1::agent::ProcessIO> processIO =
+          deserialize<v1::agent::ProcessIO>(contentType, record);
+
+        if (processIO.isError()) {
+          return process::Failure(processIO.error());
         }
 
-        if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
-          stdoutReceived += record->data().data();
-        } else if (record->data().type() ==
+        if (processIO->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
+          stdoutReceived += processIO->data().data();
+        } else if (processIO->data().type() ==
             v1::agent::ProcessIO::Data::STDERR) {
-          stderrReceived += record->data().data();
+          stderrReceived += processIO->data().data();
         }
       }
 
@@ -7968,9 +7961,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   http::Pipe::Writer writer = pipe.writer();
   http::Pipe::Reader reader = pipe.reader();
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, contentType, lambda::_1));
-
   {
     v1::agent::Call call;
     call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
@@ -7981,7 +7971,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
     attach->mutable_container_id()->CopyFrom(containerId);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(contentType, call)));
   }
 
   const std::string command = "pkill sleep\n";
@@ -8000,7 +7990,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data(command);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(contentType, call)));
   }
 
   {
@@ -8150,9 +8140,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat)
     http::Pipe::Writer writer = pipe.writer();
     http::Pipe::Reader reader = pipe.reader();
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, contentType, lambda::_1));
-
     {
       v1::agent::Call call;
       call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
@@ -8163,7 +8150,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat)
       attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
       attach->mutable_container_id()->CopyFrom(containerId);
 
-      writer.write(encoder.encode(call));
+      writer.write(::recordio::encode(serialize(contentType, call)));
     }
 
     size_t offset = 0;
@@ -8185,7 +8172,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat)
       processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
       processIO->mutable_data()->set_data(dataChunk);
 
-      writer.write(encoder.encode(call));
+      writer.write(::recordio::encode(serialize(contentType, call)));
     }
 
     // Signal `EOF` to the 'cat' command.
@@ -8203,7 +8190,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, AttachContainerInputRepeat)
       processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
       processIO->mutable_data()->set_data("");
 
-      writer.write(encoder.encode(call));
+      writer.write(::recordio::encode(serialize(contentType, call)));
     }
 
     writer.close();
@@ -8435,9 +8422,6 @@ TEST_F(AgentAPITest, AttachContainerInputFailure)
   http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
   headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, messageContentType, lambda::_1));
-
   EXPECT_CALL(containerizer, attach(_))
     .WillOnce(Return(process::Failure("Unsupported")));
 
@@ -8445,7 +8429,7 @@ TEST_F(AgentAPITest, AttachContainerInputFailure)
     slave.get()->pid,
     "api/v1",
     headers,
-    encoder.encode(call),
+    ::recordio::encode(serialize(messageContentType, call)),
     stringify(contentType));
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, response);
@@ -8573,9 +8557,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     ContentType contentType = ContentType::RECORDIO;
     ContentType messageContentType = ContentType::PROTOBUF;
 
-    ::recordio::Encoder<v1::agent::Call> encoder(
-        lambda::bind(serialize, messageContentType, lambda::_1));
-
     http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
     headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
 
@@ -8583,7 +8564,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
       slave.get()->pid,
       "api/v1",
       headers,
-      encoder.encode(call),
+      ::recordio::encode(serialize(messageContentType, call)),
       stringify(contentType));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
@@ -8624,14 +8605,11 @@ TEST_F(AgentAPITest, AttachContainerInputValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::CONTAINER_ID);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, messageContentType, lambda::_1));
-
     Future<http::Response> response = http::post(
         slave.get()->pid,
         "api/v1",
         headers,
-        encoder.encode(call),
+        ::recordio::encode(serialize(messageContentType, call)),
         stringify(contentType));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
@@ -8646,14 +8624,11 @@ TEST_F(AgentAPITest, AttachContainerInputValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::PROCESS_IO);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, messageContentType, lambda::_1));
-
     Future<http::Response> response = http::post(
         slave.get()->pid,
         "api/v1",
         headers,
-        encoder.encode(call),
+        ::recordio::encode(serialize(messageContentType, call)),
         stringify(contentType));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
@@ -8687,14 +8662,11 @@ TEST_F(AgentAPITest, HeaderValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::CONTAINER_ID);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, ContentType::PROTOBUF, lambda::_1));
-
     Future<http::Response> response = http::post(
         slave.get()->pid,
         "api/v1",
         createBasicAuthHeaders(DEFAULT_CREDENTIAL),
-        encoder.encode(call),
+        ::recordio::encode(serialize(ContentType::PROTOBUF, call)),
         stringify(ContentType::RECORDIO));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
@@ -8708,9 +8680,6 @@ TEST_F(AgentAPITest, HeaderValidation)
     call.mutable_attach_container_input()->set_type(
         v1::agent::Call::AttachContainerInput::CONTAINER_ID);
 
-    ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-        serialize, ContentType::PROTOBUF, lambda::_1));
-
     http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
     headers[MESSAGE_CONTENT_TYPE] = "unsupported/media-type";
 
@@ -8718,7 +8687,7 @@ TEST_F(AgentAPITest, HeaderValidation)
         slave.get()->pid,
         "api/v1",
         headers,
-        encoder.encode(call),
+        ::recordio::encode(serialize(ContentType::PROTOBUF, call)),
         stringify(ContentType::RECORDIO));
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::UnsupportedMediaType().status,
@@ -9382,9 +9351,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
   http::Pipe::Writer writer = pipe.writer();
   http::Pipe::Reader reader = pipe.reader();
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, messageContentType, lambda::_1));
-
   // Prepare the data that needs to be streamed to the entrypoint
   // of the container.
 
@@ -9398,7 +9364,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
     attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
     attach->mutable_container_id()->CopyFrom(containerId);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   size_t offset = 0;
@@ -9420,7 +9386,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data(dataChunk);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   // Signal `EOT` to the terminal so that it sends `EOF` to `cat` command.
@@ -9438,7 +9404,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data("\x04");
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   writer.close();
@@ -9610,9 +9576,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   http::Pipe::Writer writer = pipe.writer();
   http::Pipe::Reader reader = pipe.reader();
 
-  ::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
-      serialize, messageContentType, lambda::_1));
-
   {
     v1::agent::Call call;
     call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
@@ -9623,7 +9586,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
     attach->mutable_container_id()->CopyFrom(containerId);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   size_t offset = 0;
@@ -9645,7 +9608,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data(dataChunk);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   // Signal `EOF` to the 'cat' command.
@@ -9663,7 +9626,7 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
     processIO->mutable_data()->set_data("");
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(messageContentType, call)));
   }
 
   writer.close();
diff --git a/src/tests/common/recordio_tests.cpp b/src/tests/common/recordio_tests.cpp
index 5dd6880..a79f08b 100644
--- a/src/tests/common/recordio_tests.cpp
+++ b/src/tests/common/recordio_tests.cpp
@@ -69,19 +69,16 @@ TEST(RecordIOReaderTest, EndOfFile)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello");
-  data += encoder.encode("world!");
+  data += ::recordio::encode("HELLO");
+  data += ::recordio::encode("WORLD!");
 
   process::http::Pipe pipe;
   pipe.writer().write(data);
 
   mesos::internal::recordio::Reader<string> reader(
-      ::recordio::Decoder<string>(strings::lower),
-      pipe.reader());
+      strings::lower, pipe.reader());
 
   AWAIT_EXPECT_EQ(Result<string>::some("hello"), reader.read());
   AWAIT_EXPECT_EQ(Result<string>::some("world!"), reader.read());
@@ -93,7 +90,7 @@ TEST(RecordIOReaderTest, EndOfFile)
   EXPECT_TRUE(read1.isPending());
   EXPECT_TRUE(read2.isPending());
 
-  pipe.writer().write(encoder.encode("goodbye"));
+  pipe.writer().write(::recordio::encode("goodbye"));
   pipe.writer().close();
 
   AWAIT_EXPECT_EQ(Result<string>::some("goodbye"), read1);
@@ -106,12 +103,10 @@ TEST(RecordIOReaderTest, EndOfFile)
 
 TEST(RecordIOReaderTest, DecodingFailure)
 {
-  ::recordio::Encoder<string> encoder(strings::upper);
   process::http::Pipe pipe;
 
   mesos::internal::recordio::Reader<string> reader(
-      ::recordio::Decoder<string>(strings::lower),
-      pipe.reader());
+      strings::lower, pipe.reader());
 
   // Have multiple outstanding reads before we fail the decoder.
   Future<Result<string>> read1 = reader.read();
@@ -119,7 +114,7 @@ TEST(RecordIOReaderTest, DecodingFailure)
   Future<Result<string>> read3 = reader.read();
 
   // Write non-encoded data to the pipe so that the decoder fails.
-  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write(::recordio::encode("encoded"));
   pipe.writer().write("not encoded!\n");
 
   AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
@@ -128,7 +123,7 @@ TEST(RecordIOReaderTest, DecodingFailure)
 
   // The reader is now in a failed state, subsequent
   // writes will be dropped and all reads will fail.
-  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write(::recordio::encode("encoded"));
 
   AWAIT_EXPECT_FAILED(reader.read());
 }
@@ -136,12 +131,10 @@ TEST(RecordIOReaderTest, DecodingFailure)
 
 TEST(RecordIOReaderTest, PipeFailure)
 {
-  ::recordio::Encoder<string> encoder(strings::upper);
   process::http::Pipe pipe;
 
   mesos::internal::recordio::Reader<string> reader(
-      ::recordio::Decoder<string>(strings::lower),
-      pipe.reader());
+      strings::lower, pipe.reader());
 
   // Have multiple outstanding reads before we fail the writer.
   Future<Result<string>> read1 = reader.read();
@@ -149,7 +142,7 @@ TEST(RecordIOReaderTest, PipeFailure)
   Future<Result<string>> read3 = reader.read();
 
   // Write a record, then fail the pipe writer!
-  pipe.writer().write(encoder.encode("encoded"));
+  pipe.writer().write(::recordio::encode("ENCODED"));
   pipe.writer().fail("failure");
 
   AWAIT_EXPECT_EQ(Result<string>::some("encoded"), read1);
@@ -167,20 +160,17 @@ TEST(RecordIOTransformTest, EndOfFile)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello ");
-  data += encoder.encode("world! ");
+  data += ::recordio::encode("HELLO ");
+  data += ::recordio::encode("WORLD! ");
 
   process::http::Pipe pipeA;
   pipeA.writer().write(data);
 
   process::Owned<mesos::internal::recordio::Reader<string>> reader(
     new mesos::internal::recordio::Reader<string>(
-        ::recordio::Decoder<string>(strings::lower),
-        pipeA.reader()));
+        strings::lower, pipeA.reader()));
 
   process::http::Pipe pipeB;
 
@@ -207,20 +197,17 @@ TEST(RecordIOTransformTest, ReaderWriterEndFail)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello ");
-  data += encoder.encode("world! ");
+  data += ::recordio::encode("HELLO ");
+  data += ::recordio::encode("WORLD! ");
 
   process::http::Pipe pipeA;
   pipeA.writer().write(data);
 
   process::Owned<mesos::internal::recordio::Reader<string>> reader(
     new mesos::internal::recordio::Reader<string>(
-        ::recordio::Decoder<string>(strings::lower),
-        pipeA.reader()));
+        strings::lower, pipeA.reader()));
 
   process::http::Pipe pipeB;
 
@@ -244,20 +231,17 @@ TEST(RecordIOTransformTest, WriterReadEndFail)
 {
   // Write some data to the pipe so that records
   // are available before any reads occur.
-  ::recordio::Encoder<string> encoder(strings::upper);
-
   string data;
 
-  data += encoder.encode("hello ");
-  data += encoder.encode("world! ");
+  data += ::recordio::encode("HELLO ");
+  data += ::recordio::encode("WORLD! ");
 
   process::http::Pipe pipeA;
   pipeA.writer().write(data);
 
   process::Owned<mesos::internal::recordio::Reader<string>> reader(
     new mesos::internal::recordio::Reader<string>(
-        ::recordio::Decoder<string>(strings::lower),
-        pipeA.reader()));
+        strings::lower, pipeA.reader()));
 
   process::http::Pipe pipeB;
 
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp b/src/tests/containerizer/io_switchboard_tests.cpp
index e443145..1b347eb 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -146,27 +146,29 @@ protected:
         string stdoutReceived;
         string stderrReceived;
 
-        ::recordio::Decoder<agent::ProcessIO> decoder(lambda::bind(
-            deserialize<agent::ProcessIO>, ContentType::JSON, lambda::_1));
+        ::recordio::Decoder decoder;
 
-        Try<std::deque<Try<agent::ProcessIO>>> records = decoder.decode(data);
+        Try<std::deque<string>> records = decoder.decode(data);
 
         if (records.isError()) {
           return process::Failure(records.error());
         }
 
         while(!records->empty()) {
-          Try<agent::ProcessIO> record = records->front();
+          string record = std::move(records->front());
           records->pop_front();
 
-          if (record.isError()) {
-            return process::Failure(record.error());
+          Try<agent::ProcessIO> processIO =
+            deserialize<agent::ProcessIO>(ContentType::JSON, record);
+
+          if (processIO.isError()) {
+            return process::Failure(processIO.error());
           }
 
-          if (record->data().type() == agent::ProcessIO::Data::STDOUT) {
-            stdoutReceived += record->data().data();
-          } else if (record->data().type() == agent::ProcessIO::Data::STDERR) {
-            stderrReceived += record->data().data();
+          if (processIO->data().type() == agent::ProcessIO::Data::STDOUT) {
+            stdoutReceived += processIO->data().data();
+          } else if (processIO->data().type() == agent::ProcessIO::Data::STDERR) {
+            stderrReceived += processIO->data().data();
           }
         }
 
@@ -442,8 +444,7 @@ TEST_F(IOSwitchboardServerTest, SendHeartbeat)
   };
 
   recordio::Reader<agent::ProcessIO> responseDecoder(
-      ::recordio::Decoder<agent::ProcessIO>(deserializer),
-      reader.get());
+      deserializer, reader.get());
 
   // Wait for 5 heartbeat messages.
   Clock::pause();
@@ -555,9 +556,6 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
 
   Future<http::Response> response = connection.send(request);
 
-  ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
-        serialize, ContentType::JSON, lambda::_1));
-
   Call call;
   call.set_type(Call::ATTACH_CONTAINER_INPUT);
 
@@ -565,7 +563,7 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
   attach->set_type(Call::AttachContainerInput::CONTAINER_ID);
   attach->mutable_container_id()->set_value(id::UUID::random().toString());
 
-  writer.write(encoder.encode(call));
+  writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
 
   size_t offset = 0;
   size_t chunkSize = 4096;
@@ -584,7 +582,7 @@ TEST_F(IOSwitchboardServerTest, AttachInput)
     message->mutable_data()->set_type(ProcessIO::Data::STDIN);
     message->mutable_data()->set_data(dataChunk);
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
   }
 
   writer.close();
@@ -667,9 +665,6 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
 
   Future<http::Response> response = connection.send(request);
 
-  ::recordio::Encoder<mesos::agent::Call> encoder(lambda::bind(
-      serialize, ContentType::JSON, lambda::_1));
-
   Call call;
   call.set_type(Call::ATTACH_CONTAINER_INPUT);
 
@@ -677,7 +672,7 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
   attach->set_type(Call::AttachContainerInput::CONTAINER_ID);
   attach->mutable_container_id()->set_value(id::UUID::random().toString());
 
-  writer.write(encoder.encode(call));
+  writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
 
   // Send 5 heartbeat messages.
   Duration heartbeat = Milliseconds(10);
@@ -693,7 +688,7 @@ TEST_F(IOSwitchboardServerTest, ReceiveHeartbeat)
     message->mutable_control()->mutable_heartbeat()
         ->mutable_interval()->set_nanoseconds(heartbeat.ns());
 
-    writer.write(encoder.encode(call));
+    writer.write(::recordio::encode(serialize(ContentType::JSON, call)));
 
     Clock::advance(heartbeat);
   }
diff --git a/src/tests/executor_http_api_tests.cpp b/src/tests/executor_http_api_tests.cpp
index 99bcafb..d111547 100644
--- a/src/tests/executor_http_api_tests.cpp
+++ b/src/tests/executor_http_api_tests.cpp
@@ -936,11 +936,8 @@ TEST_P(ExecutorHttpApiTest, Subscribe)
   Option<Pipe::Reader> reader = response->reader;
   ASSERT_SOME(reader);
 
-  auto deserializer =
-    lambda::bind(deserialize<Event>, contentType, lambda::_1);
-
   Reader<Event> responseDecoder(
-      Decoder<Event>(deserializer),
+      lambda::bind(deserialize<Event>, contentType, lambda::_1),
       reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
@@ -1047,9 +1044,9 @@ TEST_P(ExecutorHttpApiTest, HeartbeatEvents)
   Option<Pipe::Reader> reader = response->reader;
   ASSERT_SOME(reader);
 
-  auto deserializer =
-    lambda::bind(deserialize<Event>, contentType, lambda::_1);
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(
+      lambda::bind(deserialize<Event>, contentType, lambda::_1),
+      reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -1158,10 +1155,8 @@ TEST_F(ExecutorHttpApiTest, HeartbeatCalls)
 
     event.mutable_subscribed()->mutable_container_id()->set_value(":P");
 
-    ::recordio::Encoder<v1::executor::Event> encoder(
-        lambda::bind(serialize, ContentType::PROTOBUF, lambda::_1));
-
-    pipe.writer().write(encoder.encode(event));
+    pipe.writer().write(
+        ::recordio::encode(serialize(ContentType::PROTOBUF, event)));
   }
 
   // Set the expectation for an executor to register with the fake agent.
diff --git a/src/tests/master/mock_master_api_subscriber.cpp b/src/tests/master/mock_master_api_subscriber.cpp
index a0808e8..893d3e3 100644
--- a/src/tests/master/mock_master_api_subscriber.cpp
+++ b/src/tests/master/mock_master_api_subscriber.cpp
@@ -98,7 +98,7 @@ private:
         deserialize<Event>, contentType, lambda::_1);
 
     std::unique_ptr<Reader<Event>> reader(new Reader<Event>(
-        ::recordio::Decoder<Event>(deserializer), response->reader.get()));
+        deserializer, response->reader.get()));
 
     auto decode = lambda::bind(
         [](std::unique_ptr<Reader<Event>>& d) { return d->read(); },
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 84ec70b..e1fedf7 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -544,8 +544,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess)
     ASSERT_SOME(reader);
 
     responseDecoder.reset(new recordio::Reader<Event>(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get()));
 
     Future<Result<Event>> event = responseDecoder->read();
@@ -652,8 +651,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure)
     ASSERT_SOME(reader);
 
     responseDecoder.reset(new recordio::Reader<Event>(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get()));
 
     Future<Result<Event>> event = responseDecoder->read();
@@ -753,8 +751,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected)
     ASSERT_SOME(reader);
 
     responseDecoder.reset(new recordio::Reader<Event>(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get()));
 
     Future<Result<Event>> event = responseDecoder->read();
@@ -846,8 +843,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   ASSERT_SOME(reader);
 
   recordio::Reader<Event> responseDecoder(
-      ::recordio::Decoder<Event>(
-          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      lambda::bind(deserialize<Event>, contentType, lambda::_1),
       reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
@@ -1696,8 +1692,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, RemoveResourceProvider)
     ASSERT_SOME(reader);
 
     recordio::Reader<Event> responseDecoder(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        lambda::bind(deserialize<Event>, contentType, lambda::_1),
         reader.get());
 
     // We expect the manager to drop the subscribe call since
diff --git a/src/tests/scheduler_http_api_tests.cpp b/src/tests/scheduler_http_api_tests.cpp
index d5b5eec..66ed9ce 100644
--- a/src/tests/scheduler_http_api_tests.cpp
+++ b/src/tests/scheduler_http_api_tests.cpp
@@ -282,7 +282,7 @@ TEST_P(SchedulerHttpApiTest, Subscribe)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -350,7 +350,7 @@ TEST_P(SchedulerHttpApiTest, RejectFrameworkWithInvalidRole)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -430,7 +430,7 @@ TEST_P(SchedulerHttpApiTest, SubscribedOnRetry)
     Option<Pipe::Reader> reader = response->reader;
     ASSERT_SOME(reader);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -457,7 +457,7 @@ TEST_P(SchedulerHttpApiTest, SubscribedOnRetry)
     Option<Pipe::Reader> reader = response->reader;
     ASSERT_SOME(reader);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     // Check if we were successfully able to subscribe after the blip.
     Future<Result<Event>> event = responseDecoder.read();
@@ -544,7 +544,7 @@ TEST_P(SchedulerHttpApiTest, UpdatePidToHttpScheduler)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -608,7 +608,7 @@ TEST_P(SchedulerHttpApiTest, UpdateHttpToPidScheduler)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   Future<Result<Event>> event = responseDecoder.read();
   AWAIT_READY(event);
@@ -705,7 +705,7 @@ TEST_P(SchedulerHttpApiTest, UpdateHttpToPidSchedulerAndBack)
   auto deserializer = lambda::bind(
       &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-  Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+  Reader<Event> responseDecoder(deserializer, reader.get());
 
   // Get SUBSCRIBED event and check framework ID.
   Future<Result<Event>> event = responseDecoder.read();
@@ -899,7 +899,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWithoutStreamId)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -974,7 +974,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWrongStreamId)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -1020,7 +1020,7 @@ TEST_P(SchedulerHttpApiTest, TeardownWrongStreamId)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);
@@ -1092,7 +1092,7 @@ TEST_P(SchedulerHttpApiTest, MalformedUUID)
     auto deserializer = lambda::bind(
         &SchedulerHttpApiTest::deserialize, this, contentType, lambda::_1);
 
-    Reader<Event> responseDecoder(Decoder<Event>(deserializer), reader.get());
+    Reader<Event> responseDecoder(deserializer, reader.get());
 
     Future<Result<Event>> event = responseDecoder.read();
     AWAIT_READY(event);