You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/11/29 06:15:35 UTC
[6/6] mesos git commit: Added `recordio::transform` helper.
Added `recordio::transform` helper.
This helper lets us transform raw bytes read from a RecordIO
reader and write to a pipe.
Review: https://reviews.apache.org/r/54039
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/95030c59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/95030c59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/95030c59
Branch: refs/heads/master
Commit: 95030c59d169621027c3c1e055ae1a6d0b3d78b5
Parents: e0d8057
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Nov 22 16:58:16 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Nov 28 22:14:01 2016 -0800
----------------------------------------------------------------------
src/common/recordio.hpp | 41 +++++++++++
src/tests/common/recordio_tests.cpp | 113 +++++++++++++++++++++++++++++++
2 files changed, 154 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/95030c59/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 4ecefd8..0f6b47b 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -31,6 +31,7 @@
#include <process/process.hpp>
#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
#include <stout/recordio.hpp>
#include <stout/result.hpp>
@@ -95,6 +96,46 @@ private:
};
+/**
+ * This is a helper function that reads records from a `Reader`, applies
+ * a transformation to the records and writes to the pipe.
+ *
+ * Returns a failed future if there are any errors reading or writing.
+ * The future is satisfied when we get a EOF.
+ *
+ * TODO(vinod): Split this method into primitives that can transform a
+ * stream of bytes to a stream of typed records that can be further transformed.
+ * See the TODO above in `Reader` for further details.
+ */
+template <typename T>
+process::Future<Nothing> transform(
+ process::Owned<Reader<T>>&& reader,
+ const std::function<std::string(const T&)>& func,
+ process::http::Pipe::Writer writer)
+{
+ return reader->read()
+ .then([=](const Result<T>& record) mutable -> process::Future<Nothing> {
+ // This could happen if EOF is sent by the writer.
+ if (record.isNone()) {
+ return Nothing();
+ }
+
+ // This could happen if there is a de-serialization error.
+ if (record.isError()) {
+ return process::Failure(record.error());
+ }
+
+ // TODO(vinod): Instead of detecting that the reader went away only
+ // after attempting a write, leverage `writer.readerClosed` future.
+ if (!writer.write(func(record.get()))) {
+ return process::Failure("Write failed to the pipe");
+ }
+
+ return transform(std::move(reader), func, writer);
+ });
+}
+
+
namespace internal {
template <typename T>
http://git-wip-us.apache.org/repos/asf/mesos/blob/95030c59/src/tests/common/recordio_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/recordio_tests.cpp b/src/tests/common/recordio_tests.cpp
index 86a1117..872a1dc 100644
--- a/src/tests/common/recordio_tests.cpp
+++ b/src/tests/common/recordio_tests.cpp
@@ -159,3 +159,116 @@ TEST(RecordIOReaderTest, PipeFailure)
// Subsequent reads should return a failure.
AWAIT_EXPECT_FAILED(reader.read());
}
+
+
+// This test verifies that when an EOF is received by the `writer` used
+// in `transform`, the future returned to the caller is satisfied.
+TEST(RecordIOTransformTest, EndOfFile)
+{
+ // Write some data to the pipe so that records
+ // are available before any reads occur.
+ ::recordio::Encoder<string> encoder(strings::upper);
+
+ string data;
+
+ data += encoder.encode("hello ");
+ data += encoder.encode("world! ");
+
+ process::http::Pipe pipeA;
+ pipeA.writer().write(data);
+
+ process::Owned<internal::recordio::Reader<string>> reader(
+ new internal::recordio::Reader<string>(
+ ::recordio::Decoder<string>(strings::lower),
+ pipeA.reader()));
+
+ process::http::Pipe pipeB;
+
+ auto trim = [](const string& str) { return strings::trim(str); };
+
+ Future<Nothing> transform = internal::recordio::transform<string>(
+ std::move(reader), trim, pipeB.writer());
+
+ Future<string> future = pipeB.reader().readAll();
+
+ pipeA.writer().close();
+
+ AWAIT_READY(transform);
+
+ pipeB.writer().close();
+
+ AWAIT_ASSERT_EQ("helloworld!", future);
+}
+
+
+// This test verifies that when the write end of the `reader` used in
+// `transform` fails, a failure is returned to the caller.
+TEST(RecordIOTransformTest, ReaderWriterEndFail)
+{
+ // Write some data to the pipe so that records
+ // are available before any reads occur.
+ ::recordio::Encoder<string> encoder(strings::upper);
+
+ string data;
+
+ data += encoder.encode("hello ");
+ data += encoder.encode("world! ");
+
+ process::http::Pipe pipeA;
+ pipeA.writer().write(data);
+
+ process::Owned<internal::recordio::Reader<string>> reader(
+ new internal::recordio::Reader<string>(
+ ::recordio::Decoder<string>(strings::lower),
+ pipeA.reader()));
+
+ process::http::Pipe pipeB;
+
+ auto trim = [](const string& str) { return strings::trim(str); };
+
+ Future<Nothing> transform = internal::recordio::transform<string>(
+ std::move(reader), trim, pipeB.writer());
+
+ Future<string> future = pipeB.reader().readAll();
+
+ pipeA.writer().fail("Writer failure");
+
+ AWAIT_FAILED(transform);
+ ASSERT_TRUE(future.isPending());
+}
+
+
+// This test verifies that when the read end of the `writer` used in
+// `transform` is closed, a failure is returned to the caller.
+TEST(RecordIOTransformTest, WriterReadEndFail)
+{
+ // Write some data to the pipe so that records
+ // are available before any reads occur.
+ ::recordio::Encoder<string> encoder(strings::upper);
+
+ string data;
+
+ data += encoder.encode("hello ");
+ data += encoder.encode("world! ");
+
+ process::http::Pipe pipeA;
+ pipeA.writer().write(data);
+
+ process::Owned<internal::recordio::Reader<string>> reader(
+ new internal::recordio::Reader<string>(
+ ::recordio::Decoder<string>(strings::lower),
+ pipeA.reader()));
+
+ process::http::Pipe pipeB;
+
+ auto trim = [](const string& str) { return strings::trim(str); };
+
+ pipeB.reader().close();
+
+ Future<Nothing> transform = internal::recordio::transform<string>(
+ std::move(reader), trim, pipeB.writer());
+
+ pipeA.writer().close();
+
+ AWAIT_FAILED(transform);
+}