You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/24 23:44:57 UTC

[1/2] mesos git commit: Added recordio_test.cpp to libprocess Makefile.

Repository: mesos
Updated Branches:
  refs/heads/master 1e83bdac0 -> ab68f9d72


Added recordio_test.cpp to libprocess Makefile.

Review: https://reviews.apache.org/r/36680


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ab68f9d7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ab68f9d7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ab68f9d7

Branch: refs/heads/master
Commit: ab68f9d72680e1001d0d7f8b868e9502bbc4f161
Parents: d9a81ed
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Jul 21 23:32:04 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 14:44:37 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/Makefile.am | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ab68f9d7/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am
index 856c2b2..bd95fe1 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -182,6 +182,7 @@ stout_tests_SOURCES =				\
   $(STOUT)/tests/protobuf_tests.pb.cc		\
   $(STOUT)/tests/protobuf_tests.pb.h		\
   $(STOUT)/tests/protobuf_tests.proto		\
+  $(STOUT)/tests/recordio_tests.cpp		\
   $(STOUT)/tests/result_tests.cpp		\
   $(STOUT)/tests/os/sendfile_tests.cpp		\
   $(STOUT)/tests/os/signals_tests.cpp		\


[2/2] mesos git commit: Introduced 'recordio' encoding facilities to stout.

Posted by bm...@apache.org.
Introduced 'recordio' encoding facilities to stout.

Note that most "Record-IO" encodings are used for file I/O
and consequently use a fixed-size header to encode the record
length. However, decoding a base-10 integer is more
straightforward to implement in most languages, and so this
was chosen instead. (Note that the Twitter streaming API
uses the same technique for portability).

Review: https://reviews.apache.org/r/36677


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d9a81ede
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d9a81ede
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d9a81ede

Branch: refs/heads/master
Commit: d9a81edee140fc43f56c4370cae1696b726fd66e
Parents: 1e83bda
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Jul 21 22:54:02 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 14:44:37 2015 -0700

----------------------------------------------------------------------
 .../3rdparty/stout/include/Makefile.am          |   1 +
 .../3rdparty/stout/include/stout/protobuf.hpp   |   5 +
 .../3rdparty/stout/include/stout/recordio.hpp   | 168 +++++++++++++++++++
 .../3rdparty/stout/tests/recordio_tests.cpp     | 161 ++++++++++++++++++
 4 files changed, 335 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
index 2394b95..5c19e3e 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
@@ -73,6 +73,7 @@ nobase_include_HEADERS =		\
   stout/preprocessor.hpp		\
   stout/proc.hpp			\
   stout/protobuf.hpp			\
+  stout/recordio.hpp			\
   stout/result.hpp			\
   stout/set.hpp				\
   stout/some.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
index 8c75f6b..a7de91f 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
@@ -45,6 +45,11 @@
 
 namespace protobuf {
 
+// TODO(bmahler): Re-use stout's 'recordio' facilities here. Note
+// that these use a fixed size length header, whereas stout's
+// currently uses a base-10 newline delimited header for language
+// portability, which makes changing these a bit tricky.
+
 // Write out the given protobuf to the specified file descriptor by
 // first writing out the length of the protobuf followed by the
 // contents.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp
new file mode 100644
index 0000000..e8a6217
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp
@@ -0,0 +1,168 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __STOUT_RECORDIO_HPP__
+#define __STOUT_RECORDIO_HPP__
+
+#include <stdlib.h>
+
+#include <deque>
+#include <functional>
+#include <string>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/numify.hpp>
+#include <stout/option.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+/**
+ * Provides facilities for "Record-IO" encoding of data.
+ * "Record-IO" encoding allows one to encode a sequence
+ * of variable-length records by prefixing each record
+ * with its size in bytes:
+ *
+ * 5\n
+ * hello
+ * 6\n
+ * world!
+ *
+ * Note that this currently only supports record lengths
+ * encoded as base 10 integer values with newlines as a
+ * delimiter. This is to provide better language portability
+ * portability: parsing a base 10 integer is simple. Most
+ * other "Record-IO" implementations use a fixed-size header
+ * of 4 bytes to directly encode an unsigned 32 bit length.
+ *
+ * TODO(bmahler): Move this to libprocess and support async
+ * consumption of data.
+ */
+namespace recordio {
+
+/**
+ * Given an encoding function for individual records, this
+ * provides encoding from typed records into "Record-IO" data.
+ */
+template <typename T>
+class Encoder
+{
+public:
+  Encoder(std::function<std::string(const T&)> _serialize)
+    : serialize(_serialize) {}
+
+  /**
+   * Returns the "Record-IO" encoded record.
+   */
+  std::string encode(const T& record) const
+  {
+    std::string s = serialize(record);
+    return stringify(s.size()) + "\n" + s;
+  }
+
+private:
+  std::function<std::string(const T&)> serialize;
+};
+
+
+/**
+ * Given a decoding function for individual records, this
+ * provides decoding from "Record-IO" data into typed records.
+ */
+template <typename T>
+class Decoder
+{
+public:
+  Decoder(std::function<Try<T>(const std::string&)> _deserialize)
+    : state(HEADER), deserialize(_deserialize) {}
+
+  /**
+   * Decodes another chunk of data from the "Record-IO" stream
+   * and returns the attempted decoding of any additional
+   * complete records.
+   *
+   * Returns an Error if the data contains an invalid length
+   * header, at which point the decoder will return Error for
+   * all subsequent calls.
+   */
+  Try<std::deque<Try<T>>> decode(const std::string& data)
+  {
+    if (state == FAILED) {
+      return Error("Decoder is in a FAILED state");
+    }
+
+    std::deque<Try<T>> records;
+
+    foreach (char c, data) {
+      if (state == HEADER) {
+        // Keep reading until we have the entire header.
+        if (c != '\n') {
+          buffer += c;
+          continue;
+        }
+
+        Try<size_t> numify = ::numify<size_t>(buffer);
+
+        // If we were unable to decode the length header, do not
+        // continue decoding since we cannot determine where to
+        // pick up the next length header!
+        if (numify.isError()) {
+          state = FAILED;
+          return Error("Failed to decode length '" + buffer + "': " +
+                       numify.error());
+        }
+
+        length = numify.get();
+        buffer.clear();
+        state = RECORD;
+
+        // Note that for 0 length records, we immediately decode.
+        if (numify.get() <= 0) {
+          records.push_back(deserialize(buffer));
+          state = HEADER;
+        }
+      } else if (state == RECORD) {
+        CHECK_SOME(length);
+        CHECK_LT(buffer.size(), length.get());
+
+        buffer += c;
+
+        if (buffer.size() == length.get()) {
+          records.push_back(deserialize(buffer));
+          buffer.clear();
+          state = HEADER;
+        }
+      }
+    }
+
+    return records;
+  }
+
+private:
+  enum {
+    HEADER,
+    RECORD,
+    FAILED
+  } state;
+
+  // TODO(bmahler): Avoid string here as it will not free
+  // its underlying memory allocation when we clear it.
+  std::string buffer;
+  Option<size_t> length;
+
+  std::function<Try<T>(const std::string&)> deserialize;
+};
+
+} // namespace recordio {
+
+#endif // __STOUT_RECORDIO_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp
new file mode 100644
index 0000000..d036785
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp
@@ -0,0 +1,161 @@
+/**
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+#include <deque>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include <stout/error.hpp>
+#include <stout/gtest.hpp>
+#include <stout/recordio.hpp>
+#include <stout/some.hpp>
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
+
+using std::deque;
+using std::string;
+
+
+template <typename T>
+bool operator == (Try<T> lhs, Try<T> rhs)
+{
+  if (lhs.isSome() != rhs.isSome()) {
+    return false;
+  }
+
+  if (lhs.isSome()) {
+    return lhs.get() == rhs.get();
+  }
+
+  return lhs.error() == rhs.error();
+}
+
+
+template <typename T>
+bool operator != (Try<T> lhs, Try<T> rhs)
+{
+  return !(lhs == rhs);
+}
+
+
+template <typename T>
+bool operator == (deque<T> rhs, deque<T> lhs)
+{
+  if (rhs.size() != lhs.size()) {
+    return false;
+  }
+
+  auto it1 = rhs.begin();
+  auto it2 = lhs.begin();
+
+  while (it1 != rhs.end()) {
+    if (*it1 != *it2) {
+      return false;
+    }
+
+    ++it1;
+    ++it2;
+  }
+
+  return true;
+}
+
+
+TEST(RecordIOTest, Encoder)
+{
+  recordio::Encoder<string> encoder(strings::upper);
+
+  string data;
+
+  data += encoder.encode("hello!");
+  data += encoder.encode("");
+  data += encoder.encode(" ");
+  data += encoder.encode("13 characters");
+
+  EXPECT_EQ(
+      "6\nHELLO!"
+      "0\n"
+      "1\n "
+      "13\n13 CHARACTERS",
+      data);
+
+  // Make sure these can be decoded.
+  recordio::Decoder<string> decoder(
+      [=](const string& data) {
+        return Try<string>(strings::lower(data));
+      });
+
+  deque<Try<string>> records;
+  records.push_back("hello!");
+  records.push_back("");
+  records.push_back(" ");
+  records.push_back("13 characters");
+
+  EXPECT_SOME_EQ(records, decoder.decode(data));
+}
+
+
+TEST(RecordIOTest, Decoder)
+{
+  // Deserializing brings to lower case, but add an
+  // error case to test deserialization failures.
+  auto deserialize = [](const string& data) -> Try<string> {
+    if (data == "error") {
+      return Error("error");
+    }
+    return strings::lower(data);
+  };
+
+  recordio::Decoder<string> decoder(deserialize);
+
+  deque<Try<string>> records;
+
+  // Empty data should not result in an error.
+  records.clear();
+
+  EXPECT_SOME_EQ(records, decoder.decode(""));
+
+  // Should decode more than 1 record when possible.
+  records.clear();
+  records.push_back("hello!");
+  records.push_back("");
+  records.push_back(" ");
+
+  EXPECT_SOME_EQ(records, decoder.decode("6\nHELLO!0\n1\n "));
+
+  // An entry which cannot be decoded should not
+  // fail the decoder permanently.
+  records.clear();
+  records.push_back(Error("error"));
+
+  EXPECT_SOME_EQ(records, decoder.decode("5\nerror"));
+
+  // Record should only be decoded once complete.
+  records.clear();
+
+  EXPECT_SOME_EQ(records, decoder.decode("1"));
+  EXPECT_SOME_EQ(records, decoder.decode("3"));
+  EXPECT_SOME_EQ(records, decoder.decode("\n"));
+  EXPECT_SOME_EQ(records, decoder.decode("13 CHARACTER"));
+
+  records.clear();
+  records.push_back("13 characters");
+
+  EXPECT_SOME_EQ(records, decoder.decode("S"));
+
+  // If the format is bad, the decoder should fail permanently.
+  EXPECT_ERROR(decoder.decode("not a number\n"));
+  EXPECT_ERROR(decoder.decode("1\n"));
+}