You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/24 23:44:57 UTC
[1/2] mesos git commit: Added recordio_test.cpp to libprocess
Makefile.
Repository: mesos
Updated Branches:
refs/heads/master 1e83bdac0 -> ab68f9d72
Added recordio_test.cpp to libprocess Makefile.
Review: https://reviews.apache.org/r/36680
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ab68f9d7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ab68f9d7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ab68f9d7
Branch: refs/heads/master
Commit: ab68f9d72680e1001d0d7f8b868e9502bbc4f161
Parents: d9a81ed
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Jul 21 23:32:04 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 14:44:37 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/Makefile.am | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ab68f9d7/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am
index 856c2b2..bd95fe1 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -182,6 +182,7 @@ stout_tests_SOURCES = \
$(STOUT)/tests/protobuf_tests.pb.cc \
$(STOUT)/tests/protobuf_tests.pb.h \
$(STOUT)/tests/protobuf_tests.proto \
+ $(STOUT)/tests/recordio_tests.cpp \
$(STOUT)/tests/result_tests.cpp \
$(STOUT)/tests/os/sendfile_tests.cpp \
$(STOUT)/tests/os/signals_tests.cpp \
[2/2] mesos git commit: Introduced 'recordio' encoding facilities to
stout.
Posted by bm...@apache.org.
Introduced 'recordio' encoding facilities to stout.
Note that most "Record-IO" encodings are used for file I/O
and consequently use a fixed-size header to encode the record
length. However, decoding a base-10 integer is more
straightforward to implement in most languages, and so this
was chosen instead. (Note that the Twitter streaming API
uses the same technique for portability).
Review: https://reviews.apache.org/r/36677
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d9a81ede
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d9a81ede
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d9a81ede
Branch: refs/heads/master
Commit: d9a81edee140fc43f56c4370cae1696b726fd66e
Parents: 1e83bda
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Jul 21 22:54:02 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 14:44:37 2015 -0700
----------------------------------------------------------------------
.../3rdparty/stout/include/Makefile.am | 1 +
.../3rdparty/stout/include/stout/protobuf.hpp | 5 +
.../3rdparty/stout/include/stout/recordio.hpp | 168 +++++++++++++++++++
.../3rdparty/stout/tests/recordio_tests.cpp | 161 ++++++++++++++++++
4 files changed, 335 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
index 2394b95..5c19e3e 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
@@ -73,6 +73,7 @@ nobase_include_HEADERS = \
stout/preprocessor.hpp \
stout/proc.hpp \
stout/protobuf.hpp \
+ stout/recordio.hpp \
stout/result.hpp \
stout/set.hpp \
stout/some.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
index 8c75f6b..a7de91f 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/protobuf.hpp
@@ -45,6 +45,11 @@
namespace protobuf {
+// TODO(bmahler): Re-use stout's 'recordio' facilities here. Note
+// that these use a fixed size length header, whereas stout's
+// currently uses a base-10 newline delimited header for language
+// portability, which makes changing these a bit tricky.
+
// Write out the given protobuf to the specified file descriptor by
// first writing out the length of the protobuf followed by the
// contents.
http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp
new file mode 100644
index 0000000..e8a6217
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/recordio.hpp
@@ -0,0 +1,168 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __STOUT_RECORDIO_HPP__
+#define __STOUT_RECORDIO_HPP__
+
+#include <stdlib.h>
+
+#include <deque>
+#include <functional>
+#include <string>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/numify.hpp>
+#include <stout/option.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+/**
+ * Provides facilities for "Record-IO" encoding of data.
+ * "Record-IO" encoding allows one to encode a sequence
+ * of variable-length records by prefixing each record
+ * with its size in bytes:
+ *
+ * 5\n
+ * hello
+ * 6\n
+ * world!
+ *
+ * Note that this currently only supports record lengths
+ * encoded as base 10 integer values with newlines as a
+ * delimiter. This is to provide better language portability
+ * portability: parsing a base 10 integer is simple. Most
+ * other "Record-IO" implementations use a fixed-size header
+ * of 4 bytes to directly encode an unsigned 32 bit length.
+ *
+ * TODO(bmahler): Move this to libprocess and support async
+ * consumption of data.
+ */
+namespace recordio {
+
+/**
+ * Given an encoding function for individual records, this
+ * provides encoding from typed records into "Record-IO" data.
+ */
+template <typename T>
+class Encoder
+{
+public:
+ Encoder(std::function<std::string(const T&)> _serialize)
+ : serialize(_serialize) {}
+
+ /**
+ * Returns the "Record-IO" encoded record.
+ */
+ std::string encode(const T& record) const
+ {
+ std::string s = serialize(record);
+ return stringify(s.size()) + "\n" + s;
+ }
+
+private:
+ std::function<std::string(const T&)> serialize;
+};
+
+
+/**
+ * Given a decoding function for individual records, this
+ * provides decoding from "Record-IO" data into typed records.
+ */
+template <typename T>
+class Decoder
+{
+public:
+ Decoder(std::function<Try<T>(const std::string&)> _deserialize)
+ : state(HEADER), deserialize(_deserialize) {}
+
+ /**
+ * Decodes another chunk of data from the "Record-IO" stream
+ * and returns the attempted decoding of any additional
+ * complete records.
+ *
+ * Returns an Error if the data contains an invalid length
+ * header, at which point the decoder will return Error for
+ * all subsequent calls.
+ */
+ Try<std::deque<Try<T>>> decode(const std::string& data)
+ {
+ if (state == FAILED) {
+ return Error("Decoder is in a FAILED state");
+ }
+
+ std::deque<Try<T>> records;
+
+ foreach (char c, data) {
+ if (state == HEADER) {
+ // Keep reading until we have the entire header.
+ if (c != '\n') {
+ buffer += c;
+ continue;
+ }
+
+ Try<size_t> numify = ::numify<size_t>(buffer);
+
+ // If we were unable to decode the length header, do not
+ // continue decoding since we cannot determine where to
+ // pick up the next length header!
+ if (numify.isError()) {
+ state = FAILED;
+ return Error("Failed to decode length '" + buffer + "': " +
+ numify.error());
+ }
+
+ length = numify.get();
+ buffer.clear();
+ state = RECORD;
+
+ // Note that for 0 length records, we immediately decode.
+ if (numify.get() <= 0) {
+ records.push_back(deserialize(buffer));
+ state = HEADER;
+ }
+ } else if (state == RECORD) {
+ CHECK_SOME(length);
+ CHECK_LT(buffer.size(), length.get());
+
+ buffer += c;
+
+ if (buffer.size() == length.get()) {
+ records.push_back(deserialize(buffer));
+ buffer.clear();
+ state = HEADER;
+ }
+ }
+ }
+
+ return records;
+ }
+
+private:
+ enum {
+ HEADER,
+ RECORD,
+ FAILED
+ } state;
+
+ // TODO(bmahler): Avoid string here as it will not free
+ // its underlying memory allocation when we clear it.
+ std::string buffer;
+ Option<size_t> length;
+
+ std::function<Try<T>(const std::string&)> deserialize;
+};
+
+} // namespace recordio {
+
+#endif // __STOUT_RECORDIO_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d9a81ede/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp
new file mode 100644
index 0000000..d036785
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/recordio_tests.cpp
@@ -0,0 +1,161 @@
+/**
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License
+*/
+
+#include <deque>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include <stout/error.hpp>
+#include <stout/gtest.hpp>
+#include <stout/recordio.hpp>
+#include <stout/some.hpp>
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
+
+using std::deque;
+using std::string;
+
+
+template <typename T>
+bool operator == (Try<T> lhs, Try<T> rhs)
+{
+ if (lhs.isSome() != rhs.isSome()) {
+ return false;
+ }
+
+ if (lhs.isSome()) {
+ return lhs.get() == rhs.get();
+ }
+
+ return lhs.error() == rhs.error();
+}
+
+
+template <typename T>
+bool operator != (Try<T> lhs, Try<T> rhs)
+{
+ return !(lhs == rhs);
+}
+
+
+template <typename T>
+bool operator == (deque<T> rhs, deque<T> lhs)
+{
+ if (rhs.size() != lhs.size()) {
+ return false;
+ }
+
+ auto it1 = rhs.begin();
+ auto it2 = lhs.begin();
+
+ while (it1 != rhs.end()) {
+ if (*it1 != *it2) {
+ return false;
+ }
+
+ ++it1;
+ ++it2;
+ }
+
+ return true;
+}
+
+
+TEST(RecordIOTest, Encoder)
+{
+ recordio::Encoder<string> encoder(strings::upper);
+
+ string data;
+
+ data += encoder.encode("hello!");
+ data += encoder.encode("");
+ data += encoder.encode(" ");
+ data += encoder.encode("13 characters");
+
+ EXPECT_EQ(
+ "6\nHELLO!"
+ "0\n"
+ "1\n "
+ "13\n13 CHARACTERS",
+ data);
+
+ // Make sure these can be decoded.
+ recordio::Decoder<string> decoder(
+ [=](const string& data) {
+ return Try<string>(strings::lower(data));
+ });
+
+ deque<Try<string>> records;
+ records.push_back("hello!");
+ records.push_back("");
+ records.push_back(" ");
+ records.push_back("13 characters");
+
+ EXPECT_SOME_EQ(records, decoder.decode(data));
+}
+
+
+TEST(RecordIOTest, Decoder)
+{
+ // Deserializing brings to lower case, but add an
+ // error case to test deserialization failures.
+ auto deserialize = [](const string& data) -> Try<string> {
+ if (data == "error") {
+ return Error("error");
+ }
+ return strings::lower(data);
+ };
+
+ recordio::Decoder<string> decoder(deserialize);
+
+ deque<Try<string>> records;
+
+ // Empty data should not result in an error.
+ records.clear();
+
+ EXPECT_SOME_EQ(records, decoder.decode(""));
+
+ // Should decode more than 1 record when possible.
+ records.clear();
+ records.push_back("hello!");
+ records.push_back("");
+ records.push_back(" ");
+
+ EXPECT_SOME_EQ(records, decoder.decode("6\nHELLO!0\n1\n "));
+
+ // An entry which cannot be decoded should not
+ // fail the decoder permanently.
+ records.clear();
+ records.push_back(Error("error"));
+
+ EXPECT_SOME_EQ(records, decoder.decode("5\nerror"));
+
+ // Record should only be decoded once complete.
+ records.clear();
+
+ EXPECT_SOME_EQ(records, decoder.decode("1"));
+ EXPECT_SOME_EQ(records, decoder.decode("3"));
+ EXPECT_SOME_EQ(records, decoder.decode("\n"));
+ EXPECT_SOME_EQ(records, decoder.decode("13 CHARACTER"));
+
+ records.clear();
+ records.push_back("13 characters");
+
+ EXPECT_SOME_EQ(records, decoder.decode("S"));
+
+ // If the format is bad, the decoder should fail permanently.
+ EXPECT_ERROR(decoder.decode("not a number\n"));
+ EXPECT_ERROR(decoder.decode("1\n"));
+}