You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/11/29 06:15:35 UTC

[6/6] mesos git commit: Added `recordio::transform` helper.

Added `recordio::transform` helper.

This helper lets us transform raw bytes read from a RecordIO
reader and write to a pipe.

Review: https://reviews.apache.org/r/54039


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/95030c59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/95030c59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/95030c59

Branch: refs/heads/master
Commit: 95030c59d169621027c3c1e055ae1a6d0b3d78b5
Parents: e0d8057
Author: Vinod Kone <vi...@gmail.com>
Authored: Tue Nov 22 16:58:16 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Nov 28 22:14:01 2016 -0800

----------------------------------------------------------------------
 src/common/recordio.hpp             |  41 +++++++++++
 src/tests/common/recordio_tests.cpp | 113 +++++++++++++++++++++++++++++++
 2 files changed, 154 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/95030c59/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 4ecefd8..0f6b47b 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -31,6 +31,7 @@
 #include <process/process.hpp>
 
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/recordio.hpp>
 #include <stout/result.hpp>
 
@@ -95,6 +96,46 @@ private:
 };
 
 
+/**
+ * This is a helper function that reads records from a `Reader`, applies
+ * a transformation to the records and writes to the pipe.
+ *
+ * Returns a failed future if there are any errors reading or writing.
+ * The future is satisfied when we get a EOF.
+ *
+ * TODO(vinod): Split this method into primitives that can transform a
+ * stream of bytes to a stream of typed records that can be further transformed.
+ * See the TODO above in `Reader` for further details.
+ */
+template <typename T>
+process::Future<Nothing> transform(
+    process::Owned<Reader<T>>&& reader,
+    const std::function<std::string(const T&)>& func,
+    process::http::Pipe::Writer writer)
+{
+  return reader->read()
+    .then([=](const Result<T>& record) mutable -> process::Future<Nothing> {
+      // This could happen if EOF is sent by the writer.
+      if (record.isNone()) {
+        return Nothing();
+      }
+
+      // This could happen if there is a de-serialization error.
+      if (record.isError()) {
+        return process::Failure(record.error());
+      }
+
+      // TODO(vinod): Instead of detecting that the reader went away only
+      // after attempting a write, leverage `writer.readerClosed` future.
+      if (!writer.write(func(record.get()))) {
+        return process::Failure("Write failed to the pipe");
+      }
+
+      return transform(std::move(reader), func, writer);
+  });
+}
+
+
 namespace internal {
 
 template <typename T>

http://git-wip-us.apache.org/repos/asf/mesos/blob/95030c59/src/tests/common/recordio_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/recordio_tests.cpp b/src/tests/common/recordio_tests.cpp
index 86a1117..872a1dc 100644
--- a/src/tests/common/recordio_tests.cpp
+++ b/src/tests/common/recordio_tests.cpp
@@ -159,3 +159,116 @@ TEST(RecordIOReaderTest, PipeFailure)
   // Subsequent reads should return a failure.
   AWAIT_EXPECT_FAILED(reader.read());
 }
+
+
+// This test verifies that when an EOF is received by the `writer` used
+// in `transform`, the future returned to the caller is satisfied.
+TEST(RecordIOTransformTest, EndOfFile)
+{
+  // Write some data to the pipe so that records
+  // are available before any reads occur.
+  ::recordio::Encoder<string> encoder(strings::upper);
+
+  string data;
+
+  data += encoder.encode("hello ");
+  data += encoder.encode("world! ");
+
+  process::http::Pipe pipeA;
+  pipeA.writer().write(data);
+
+  process::Owned<internal::recordio::Reader<string>> reader(
+    new internal::recordio::Reader<string>(
+        ::recordio::Decoder<string>(strings::lower),
+        pipeA.reader()));
+
+  process::http::Pipe pipeB;
+
+  auto trim = [](const string& str) { return strings::trim(str); };
+
+  Future<Nothing> transform = internal::recordio::transform<string>(
+      std::move(reader), trim, pipeB.writer());
+
+  Future<string> future = pipeB.reader().readAll();
+
+  pipeA.writer().close();
+
+  AWAIT_READY(transform);
+
+  pipeB.writer().close();
+
+  AWAIT_ASSERT_EQ("helloworld!", future);
+}
+
+
+// This test verifies that when the write end of the `reader` used in
+// `transform` fails, a failure is returned to the caller.
+TEST(RecordIOTransformTest, ReaderWriterEndFail)
+{
+  // Write some data to the pipe so that records
+  // are available before any reads occur.
+  ::recordio::Encoder<string> encoder(strings::upper);
+
+  string data;
+
+  data += encoder.encode("hello ");
+  data += encoder.encode("world! ");
+
+  process::http::Pipe pipeA;
+  pipeA.writer().write(data);
+
+  process::Owned<internal::recordio::Reader<string>> reader(
+    new internal::recordio::Reader<string>(
+        ::recordio::Decoder<string>(strings::lower),
+        pipeA.reader()));
+
+  process::http::Pipe pipeB;
+
+  auto trim = [](const string& str) { return strings::trim(str); };
+
+  Future<Nothing> transform = internal::recordio::transform<string>(
+      std::move(reader), trim, pipeB.writer());
+
+  Future<string> future = pipeB.reader().readAll();
+
+  pipeA.writer().fail("Writer failure");
+
+  AWAIT_FAILED(transform);
+  ASSERT_TRUE(future.isPending());
+}
+
+
+// This test verifies that when the read end of the `writer` used in
+// `transform` is closed, a failure is returned to the caller.
+TEST(RecordIOTransformTest, WriterReadEndFail)
+{
+  // Write some data to the pipe so that records
+  // are available before any reads occur.
+  ::recordio::Encoder<string> encoder(strings::upper);
+
+  string data;
+
+  data += encoder.encode("hello ");
+  data += encoder.encode("world! ");
+
+  process::http::Pipe pipeA;
+  pipeA.writer().write(data);
+
+  process::Owned<internal::recordio::Reader<string>> reader(
+    new internal::recordio::Reader<string>(
+        ::recordio::Decoder<string>(strings::lower),
+        pipeA.reader()));
+
+  process::http::Pipe pipeB;
+
+  auto trim = [](const string& str) { return strings::trim(str); };
+
+  pipeB.reader().close();
+
+  Future<Nothing> transform = internal::recordio::transform<string>(
+      std::move(reader), trim, pipeB.writer());
+
+  pipeA.writer().close();
+
+  AWAIT_FAILED(transform);
+}