You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:26 UTC
[06/10] git commit: Refactored log tools and added a tool to
initialize the log.
Refactored log tools and added a tool to initialize the log.
Also pulled storage related code out (I haven't changed them) from
replica.cpp.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/16433
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e2fe5860
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e2fe5860
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e2fe5860
Branch: refs/heads/master
Commit: e2fe5860bc8542e5408bc86ac7322002326d41b3
Parents: f9b60c4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:55:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:55:00 2014 -0800
----------------------------------------------------------------------
src/Makefile.am | 14 +-
src/log/leveldb.cpp | 422 +++++++++++++++++++++++++++++++++++++
src/log/leveldb.hpp | 51 +++++
src/log/main.cpp | 132 +++++-------
src/log/replica.cpp | 439 +--------------------------------------
src/log/replica.hpp | 6 +-
src/log/storage.hpp | 61 ++++++
src/log/tool.hpp | 51 +++++
src/log/tool/initialize.cpp | 148 +++++++++++++
src/log/tool/initialize.hpp | 63 ++++++
src/log/tool/read.cpp | 188 +++++++++++++++++
src/log/tool/read.hpp | 65 ++++++
12 files changed, 1120 insertions(+), 520 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 60fcb31..d58b46e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -301,17 +301,25 @@ liblog_la_SOURCES = \
log/catchup.cpp \
log/consensus.cpp \
log/coordinator.cpp \
+ log/leveldb.cpp \
log/log.cpp \
log/recover.cpp \
- log/replica.cpp
+ log/replica.cpp \
+ log/tool/initialize.cpp \
+ log/tool/read.cpp
liblog_la_SOURCES += \
log/catchup.hpp \
log/consensus.hpp \
log/coordinator.hpp \
- log/recover.hpp \
- log/replica.hpp \
+ log/leveldb.hpp \
log/log.hpp \
log/network.hpp \
+ log/recover.hpp \
+ log/replica.hpp \
+ log/storage.hpp \
+ log/tool.hpp \
+ log/tool/initialize.hpp \
+ log/tool/read.hpp \
messages/log.hpp \
messages/log.proto
nodist_liblog_la_SOURCES = $(LOG_PROTOS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/log/leveldb.cpp b/src/log/leveldb.cpp
new file mode 100644
index 0000000..7819963
--- /dev/null
+++ b/src/log/leveldb.cpp
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+
+#include <leveldb/comparator.h>
+#include <leveldb/write_batch.h>
+
+#include <stout/check.hpp>
+#include <stout/error.hpp>
+#include <stout/numify.hpp>
+#include <stout/stopwatch.hpp>
+#include <stout/strings.hpp>
+
+#include "log/leveldb.hpp"
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+class Varint64Comparator : public leveldb::Comparator
+{
+public:
+ virtual int Compare(
+ const leveldb::Slice& a,
+ const leveldb::Slice& b) const
+ {
+ // TODO(benh): Use varint comparator.
+ LOG(FATAL) << "Unimplemented";
+ // uint64_t left = position(a);
+ // uint64_t right = position(b);
+ // if (left < right) return -1;
+ // if (left == right) return 0;
+ // if (left > right) return 1;
+ }
+
+ virtual const char* Name() const
+ {
+ // Note that this name MUST NOT CHANGE across uses of this
+ // comparator with the same DB (the semantics of doing so are
+ // undefined if the database doesn't catch this first).
+ return "varint64";
+ }
+
+ virtual void FindShortestSeparator(
+ string* start,
+ const leveldb::Slice& limit) const
+ {
+ // Intentional no-op.
+ }
+
+ virtual void FindShortSuccessor(string* key) const
+ {
+ // Intentional no-op.
+ }
+};
+
+
+// TODO(benh): Use varint comparator.
+// static Varint64Comparator comparator;
+
+
+// Returns a string representing the specified position. Note that we
+// adjust the actual position by incrementing it by 1 because we
+// reserve 0 for storing the promise record (Record::Promise,
+// DEPRECATED!), or the metadata (Record::Metadata).
+static string encode(uint64_t position, bool adjust = true)
+{
+ // Adjusted stringified represenation is plus 1 of actual position.
+ position = adjust ? position + 1 : position;
+
+ // TODO(benh): Use varint encoding for VarInt64Comparator!
+ // string s;
+ // google::protobuf::io::StringOutputStream _stream(&s);
+ // google::protobuf::io::CodedOutputStream stream(&_stream);
+ // position = adjust ? position + 1 : position;
+ // stream.WriteVarint64(position);
+ // return s;
+
+ Try<string> s = strings::format("%.*d", 10, position);
+ CHECK_SOME(s);
+ return s.get();
+}
+
+
+// Returns the position as represented in the specified slice
+// (performing a decrement as necessary to determine the actual
+// position represented).
+static uint64_t decode(const leveldb::Slice& s)
+{
+ // TODO(benh): Use varint decoding for VarInt64Comparator!
+ // uint64_t position;
+ // google::protobuf::io::ArrayInputStream _stream(s.data(), s.size());
+ // google::protobuf::io::CodedInputStream stream(&_stream);
+ // bool success = stream.ReadVarint64(&position);
+ // CHECK(success);
+ // return position - 1; // Actual position is less 1 of stringified.
+ Try<uint64_t> position = numify<uint64_t>(string(s.data(), s.size()));
+ CHECK_SOME(position);
+ return position.get() - 1; // Actual position is less 1 of stringified.
+}
+
+
+LevelDBStorage::LevelDBStorage()
+ : db(NULL), first(0)
+{
+ // Nothing to see here.
+}
+
+
+LevelDBStorage::~LevelDBStorage()
+{
+ delete db; // Might be null if open failed in LevelDBStorage::recover.
+}
+
+
+Try<LevelDBStorage::State> LevelDBStorage::restore(const string& path)
+{
+ leveldb::Options options;
+ options.create_if_missing = true;
+
+ // TODO(benh): Can't use varint comparator until bug discussed at
+ // groups.google.com/group/leveldb/browse_thread/thread/17eac39168909ba7
+ // gets fixed. For now, we are using the default byte-wise
+ // comparator and *assuming* that the encoding from unsigned long to
+ // string produces a stable ordering. Checks below.
+ // options.comparator = &comparator;
+
+ const string& one = encode(1);
+ const string& two = encode(2);
+ const string& ten = encode(10);
+
+ CHECK(leveldb::BytewiseComparator()->Compare(one, two) < 0);
+ CHECK(leveldb::BytewiseComparator()->Compare(two, one) > 0);
+ CHECK(leveldb::BytewiseComparator()->Compare(one, ten) < 0);
+ CHECK(leveldb::BytewiseComparator()->Compare(ten, two) > 0);
+ CHECK(leveldb::BytewiseComparator()->Compare(ten, ten) == 0);
+
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ leveldb::Status status = leveldb::DB::Open(options, path, &db);
+
+ if (!status.ok()) {
+ // TODO(benh): Consider trying to repair the DB.
+ return Error(status.ToString());
+ }
+
+ LOG(INFO) << "Opened db in " << stopwatch.elapsed();
+
+ stopwatch.start(); // Restart the stopwatch.
+
+ // TODO(benh): Conditionally compact to avoid long recovery times?
+ db->CompactRange(NULL, NULL);
+
+ LOG(INFO) << "Compacted db in " << stopwatch.elapsed();
+
+ State state;
+ state.begin = 0;
+ state.end = 0;
+
+ // TODO(benh): Consider just reading the "promise" record (e.g.,
+ // 'encode(0, false)') and then iterating over the rest of the
+ // records and confirming that they are all indeed of type
+ // Record::Action.
+
+ stopwatch.start(); // Restart the stopwatch.
+
+ leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
+
+ LOG(INFO) << "Created db iterator in " << stopwatch.elapsed();
+
+ stopwatch.start(); // Restart the stopwatch.
+
+ iterator->SeekToFirst();
+
+ LOG(INFO) << "Seeked to beginning of db in " << stopwatch.elapsed();
+
+ stopwatch.start(); // Restart the stopwatch.
+
+ uint64_t keys = 0;
+
+ while (iterator->Valid()) {
+ keys++;
+ const leveldb::Slice& slice = iterator->value();
+
+ google::protobuf::io::ArrayInputStream stream(slice.data(), slice.size());
+
+ Record record;
+
+ if (!record.ParseFromZeroCopyStream(&stream)) {
+ return Error("Failed to deserialize record");
+ }
+
+ switch (record.type()) {
+ case Record::METADATA: {
+ CHECK(record.has_metadata());
+ state.metadata.CopyFrom(record.metadata());
+ break;
+ }
+
+ // DEPRECATED!
+ case Record::PROMISE: {
+ CHECK(record.has_promise());
+ // This replica is in old format. Set its status to VOTING
+ // since there is no catch-up logic in the old code and this
+ // replica is obviously not empty.
+ state.metadata.set_status(Metadata::VOTING);
+ state.metadata.set_promised(record.promise().proposal());
+ break;
+ }
+
+ case Record::ACTION: {
+ CHECK(record.has_action());
+ const Action& action = record.action();
+ if (action.has_learned() && action.learned()) {
+ state.learned.insert(action.position());
+ state.unlearned.erase(action.position());
+ if (action.has_type() && action.type() == Action::TRUNCATE) {
+ state.begin = std::max(state.begin, action.truncate().to());
+ }
+ } else {
+ state.learned.erase(action.position());
+ state.unlearned.insert(action.position());
+ }
+ state.end = std::max(state.end, action.position());
+ break;
+ }
+
+ default: {
+ return Error("Bad record");
+ }
+ }
+
+ iterator->Next();
+ }
+
+ LOG(INFO) << "Iterated through " << keys
+ << " keys in the db in " << stopwatch.elapsed();
+
+ // Determine the first position still in leveldb so during a
+ // truncation we can attempt to delete all positions from the first
+ // position up to the truncate position. Note that this is not the
+ // beginning position of the log, but rather the first position that
+ // remains (i.e., hasn't been deleted) in leveldb.
+ iterator->Seek(encode(0));
+
+ if (iterator->Valid()) {
+ first = decode(iterator->key());
+ }
+
+ delete iterator;
+
+ return state;
+}
+
+
+Try<Nothing> LevelDBStorage::persist(const Metadata& metadata)
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ leveldb::WriteOptions options;
+ options.sync = true;
+
+ Record record;
+ record.set_type(Record::METADATA);
+ record.mutable_metadata()->CopyFrom(metadata);
+
+ string value;
+
+ if (!record.SerializeToString(&value)) {
+ return Error("Failed to serialize record");
+ }
+
+ leveldb::Status status = db->Put(options, encode(0, false), value);
+
+ if (!status.ok()) {
+ return Error(status.ToString());
+ }
+
+ LOG(INFO) << "Persisting metadata (" << value.size()
+ << " bytes) to leveldb took " << stopwatch.elapsed();
+
+ return Nothing();
+}
+
+
+Try<Nothing> LevelDBStorage::persist(const Action& action)
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ Record record;
+ record.set_type(Record::ACTION);
+ record.mutable_action()->MergeFrom(action);
+
+ string value;
+
+ if (!record.SerializeToString(&value)) {
+ return Error("Failed to serialize record");
+ }
+
+ leveldb::WriteOptions options;
+ options.sync = true;
+
+ leveldb::Status status = db->Put(options, encode(action.position()), value);
+
+ if (!status.ok()) {
+ return Error(status.ToString());
+ }
+
+ LOG(INFO) << "Persisting action (" << value.size()
+ << " bytes) to leveldb took " << stopwatch.elapsed();
+
+ // Delete positions if a truncate action has been *learned*. Note
+ // that we do this in a best-effort fashion (i.e., we ignore any
+ // failures to the database since we can always try again).
+ if (action.has_type() && action.type() == Action::TRUNCATE &&
+ action.has_learned() && action.learned()) {
+ CHECK(action.has_truncate());
+
+ stopwatch.start(); // Restart the stopwatch.
+
+ // To actually perform the truncation in leveldb we need to remove
+ // all the keys that represent positions no longer in the log. We
+ // do this by attempting to delete all keys that represent the
+ // first position we know is still in leveldb up to (but
+ // excluding) the truncate position. Note that this works because
+ // the semantics of WriteBatch are such that even if the position
+ // doesn't exist (which is possible because this replica has some
+ // holes), we can attempt to delete the key that represents it and
+ // it will just ignore that key. This is *much* cheaper than
+ // actually iterating through the entire database instead (which
+ // was, for posterity, the original implementation). In addition,
+ // caching the "first" position we know is in the database is
+ // cheaper than using an iterator to determine the first position
+ // (which was, for posterity, the second implementation).
+
+ leveldb::WriteBatch batch;
+
+ // Add positions up to (but excluding) the truncate position to
+ // the batch starting at the first position still in leveldb.
+ uint64_t index = 0;
+ while ((first + index) < action.truncate().to()) {
+ batch.Delete(encode(first + index));
+ index++;
+ }
+
+ // If we added any positions, attempt to delete them!
+ if (index > 0) {
+ // We do this write asynchronously (e.g., using default options).
+ leveldb::Status status = db->Write(leveldb::WriteOptions(), &batch);
+
+ if (!status.ok()) {
+ LOG(WARNING) << "Ignoring leveldb batch delete failure: "
+ << status.ToString();
+ } else {
+ first = action.truncate().to(); // Save the new first position!
+
+ LOG(INFO) << "Deleting ~" << index
+ << " keys from leveldb took " << stopwatch.elapsed();
+ }
+ }
+ }
+
+ return Nothing();
+}
+
+
+Try<Action> LevelDBStorage::read(uint64_t position)
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ leveldb::ReadOptions options;
+
+ string value;
+
+ leveldb::Status status = db->Get(options, encode(position), &value);
+
+ if (!status.ok()) {
+ return Error(status.ToString());
+ }
+
+ google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+
+ Record record;
+
+ if (!record.ParseFromZeroCopyStream(&stream)) {
+ return Error("Failed to deserialize record");
+ }
+
+ if (record.type() != Record::ACTION) {
+ return Error("Bad record");
+ }
+
+ LOG(INFO) << "Reading position from leveldb took " << stopwatch.elapsed();
+
+ return record.action();
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/leveldb.hpp
----------------------------------------------------------------------
diff --git a/src/log/leveldb.hpp b/src/log/leveldb.hpp
new file mode 100644
index 0000000..7eb51be
--- /dev/null
+++ b/src/log/leveldb.hpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_LEVELDB_HPP__
+#define __LOG_LEVELDB_HPP__
+
+#include <leveldb/db.h>
+
+#include "log/storage.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Concrete implementation of the storage interface using leveldb.
+class LevelDBStorage : public Storage
+{
+public:
+ LevelDBStorage();
+ virtual ~LevelDBStorage();
+
+ virtual Try<State> restore(const std::string& path);
+ virtual Try<Nothing> persist(const Metadata& metadata);
+ virtual Try<Nothing> persist(const Action& action);
+ virtual Try<Action> read(uint64_t position);
+
+private:
+ leveldb::DB* db;
+ uint64_t first; // First position still in leveldb, used during truncation.
+};
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_LEVELDB_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/main.cpp
----------------------------------------------------------------------
diff --git a/src/log/main.cpp b/src/log/main.cpp
index f07bd10..c37dd6f 100644
--- a/src/log/main.cpp
+++ b/src/log/main.cpp
@@ -16,119 +16,89 @@
* limitations under the License.
*/
+#include <string.h>
+
#include <iostream>
-#include <list>
#include <string>
-#include <process/process.hpp>
+#include <process/owned.hpp>
-#include <stout/check.hpp>
-#include <stout/flags.hpp>
#include <stout/foreach.hpp>
-#include <stout/none.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-
-#include "log/replica.hpp"
+#include <stout/hashmap.hpp>
-#include "logging/flags.hpp"
-#include "logging/logging.hpp"
+#include "log/tool.hpp"
+#include "log/tool/initialize.hpp"
+#include "log/tool/read.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::log;
+using namespace process;
+
using std::cerr;
-using std::cout;
using std::endl;
using std::string;
+// All the registered tools.
+static hashmap<string, Owned<tool::Tool> > tools;
-void usage(const char* argv0, const flags::FlagsBase& flags)
+
+static void add(const Owned<tool::Tool>& tool)
{
- cerr << "Usage: " << os::basename(argv0).get() << " [...] path/to/log"
- << endl
- << "Supported options:" << endl
- << flags.usage();
+ tools[tool->name()] = tool;
}
-int main(int argc, char** argv)
+static void usage(const char* argv0)
{
- flags::Flags<logging::Flags> flags;
-
- Option<uint64_t> from;
- flags.add(&from,
- "from",
- "Position from which to start reading in the log");
-
- Option<uint64_t> to;
- flags.add(&to,
- "to",
- "Position from which to stop reading in the log");
-
- bool help;
- flags.add(&help,
- "help",
- "Prints this help message",
- false);
-
- Try<Nothing> load = flags.load(None(), argc, argv);
-
- if (load.isError()) {
- cerr << load.error() << endl;
- usage(argv[0], flags);
- exit(1);
- }
+ cerr << "Usage: " << argv0 << " <command> [OPTIONS]" << endl
+ << endl
+ << "Available commands:" << endl
+ << " help" << endl;
- if (help) {
- usage(argv[0], flags);
- exit(1);
+ // Get a list of available tools.
+ foreachkey (const string& name, tools) {
+ cerr << " " << name << endl;
}
+}
- process::initialize();
-
- logging::initialize(argv[0], flags);
-
- string path = argv[argc - 1];
-
- Replica replica(path);
-
- process::Future<uint64_t> begin = replica.beginning();
- process::Future<uint64_t> end = replica.ending();
-
- begin.await();
- end.await();
-
- CHECK(begin.isReady());
- CHECK(end.isReady());
- if (!from.isSome()) {
- from = begin.get();
- }
+int main(int argc, char** argv)
+{
+ // Register log tools.
+ add(Owned<tool::Tool>(new tool::Initialize()));
+ add(Owned<tool::Tool>(new tool::Read()));
- if (!to.isSome()) {
- to = end.get();
+ if (argc < 2) {
+ usage(argv[0]);
+ return 1;
}
- CHECK_SOME(from);
- CHECK_SOME(to);
-
- cerr << endl << "Attempting to read the log from "
- << from.get() << " to " << to.get() << endl << endl;
-
- process::Future<std::list<Action> > actions =
- replica.read(from.get(), to.get());
+ if (!strcmp(argv[1], "help")) {
+ if (argc == 2) {
+ usage(argv[0]);
+ return 0;
+ }
- actions.await();
+ // 'mesos-log help command' => 'mesos-log command --help'
+ argv[1] = argv[2];
+ argv[2] = (char*) "--help";
+ }
- CHECK(!actions.isFailed()) << actions.failure();
+ string command = argv[1];
- CHECK(actions.isReady());
+ if (!tools.contains(command)) {
+ cerr << "Cannot find command '" << command << "'" << endl << endl;
+ usage(argv[0]);
+ return 1;
+ }
- foreach (const Action& action, actions.get()) {
- cout << "----------------------------------------------" << endl;
- action.PrintDebugString();
+ // Execute the command.
+ Try<Nothing> execute = tools[command]->execute(argc, argv);
+ if (execute.isError()) {
+ cerr << execute.error() << endl;
+ return 1;
}
return 0;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/replica.cpp
----------------------------------------------------------------------
diff --git a/src/log/replica.cpp b/src/log/replica.cpp
index da9310f..ec6e38c 100644
--- a/src/log/replica.cpp
+++ b/src/log/replica.cpp
@@ -16,12 +16,6 @@
* limitations under the License.
*/
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-#include <leveldb/comparator.h>
-#include <leveldb/db.h>
-#include <leveldb/write_batch.h>
-
#include <algorithm>
#include <process/dispatch.hpp>
@@ -32,22 +26,18 @@
#include <stout/foreach.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
-#include <stout/numify.hpp>
-#include <stout/stopwatch.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
#include <stout/utils.hpp>
#include "common/type_utils.hpp"
+#include "log/leveldb.hpp"
#include "log/replica.hpp"
-
-#include "logging/logging.hpp"
-
-#include "messages/log.hpp"
+#include "log/storage.hpp"
using namespace process;
-using process::wait; // Necessary on some OS's to disambiguate.
-
using std::list;
using std::set;
using std::string;
@@ -66,425 +56,6 @@ Protocol<RecoverRequest, RecoverResponse> recover;
} // namespace protocol {
-struct State
-{
- Metadata metadata; // The metadata for the replica.
- uint64_t begin; // Beginning position of the log.
- uint64_t end; // Ending position of the log.
- set<uint64_t> learned; // Positions present and learned
- set<uint64_t> unlearned; // Positions present but unlearned.
-};
-
-
-// Abstract interface for reading and writing records.
-class Storage
-{
-public:
- virtual ~Storage() {}
- virtual Try<State> restore(const string& path) = 0;
- virtual Try<Nothing> persist(const Metadata& metadata) = 0;
- virtual Try<Nothing> persist(const Action& action) = 0;
- virtual Try<Action> read(uint64_t position) = 0;
-};
-
-
-// Concrete implementation of the storage interface using leveldb.
-class LevelDBStorage : public Storage
-{
-public:
- LevelDBStorage();
- virtual ~LevelDBStorage();
-
- virtual Try<State> restore(const string& path);
- virtual Try<Nothing> persist(const Metadata& metadata);
- virtual Try<Nothing> persist(const Action& action);
- virtual Try<Action> read(uint64_t position);
-
-private:
- class Varint64Comparator : public leveldb::Comparator
- {
- public:
- virtual int Compare(
- const leveldb::Slice& a,
- const leveldb::Slice& b) const
- {
- // TODO(benh): Use varint comparator.
- LOG(FATAL) << "Unimplemented";
- // uint64_t left = position(a);
- // uint64_t right = position(b);
- // if (left < right) return -1;
- // if (left == right) return 0;
- // if (left > right) return 1;
- }
-
- virtual const char* Name() const
- {
- // Note that this name MUST NOT CHANGE across uses of this
- // comparator with the same DB (the semantics of doing so are
- // undefined if the database doesn't catch this first).
- return "varint64";
- }
-
- virtual void FindShortestSeparator(
- string* start,
- const leveldb::Slice& limit) const
- {
- // Intentional no-op.
- }
-
- virtual void FindShortSuccessor(string* key) const
- {
- // Intentional no-op.
- }
- };
-
- // Returns a string representing the specified position. Note that
- // we adjust the actual position by incrementing it by 1 because we
- // reserve 0 for storing the promise record (Record::Promise,
- // DEPRECATED!), or the metadata (Record::Metadata).
- static string encode(uint64_t position, bool adjust = true)
- {
- // Adjusted stringified represenation is plus 1 of actual position.
- position = adjust ? position + 1 : position;
-
- // TODO(benh): Use varint encoding for VarInt64Comparator!
- // string s;
- // google::protobuf::io::StringOutputStream _stream(&s);
- // google::protobuf::io::CodedOutputStream stream(&_stream);
- // position = adjust ? position + 1 : position;
- // stream.WriteVarint64(position);
- // return s;
-
- Try<string> s = strings::format("%.*d", 10, position);
- CHECK_SOME(s);
- return s.get();
- }
-
- // Returns the position as represented in the specified slice
- // (performing a decrement as necessary to determine the actual
- // position represented).
- static uint64_t decode(const leveldb::Slice& s)
- {
- // TODO(benh): Use varint decoding for VarInt64Comparator!
- // uint64_t position;
- // google::protobuf::io::ArrayInputStream _stream(s.data(), s.size());
- // google::protobuf::io::CodedInputStream stream(&_stream);
- // bool success = stream.ReadVarint64(&position);
- // CHECK(success);
- // return position - 1; // Actual position is less 1 of stringified.
- Try<uint64_t> position = numify<uint64_t>(string(s.data(), s.size()));
- CHECK_SOME(position);
- return position.get() - 1; // Actual position is less 1 of stringified.
- }
-
- // Varint64Comparator comparator; // TODO(benh): Use varint comparator.
-
- leveldb::DB* db;
-
- uint64_t first; // First position still in leveldb, used during truncation.
-};
-
-
-LevelDBStorage::LevelDBStorage()
- : db(NULL), first(0)
-{
- // Nothing to see here.
-}
-
-
-LevelDBStorage::~LevelDBStorage()
-{
- delete db; // Might be null if open failed in LevelDBStorage::recover.
-}
-
-
-Try<State> LevelDBStorage::restore(const string& path)
-{
- leveldb::Options options;
- options.create_if_missing = true;
-
- // TODO(benh): Can't use varint comparator until bug discussed at
- // groups.google.com/group/leveldb/browse_thread/thread/17eac39168909ba7
- // gets fixed. For now, we are using the default byte-wise
- // comparator and *assuming* that the encoding from unsigned long to
- // string produces a stable ordering. Checks below.
- // options.comparator = &comparator;
-
- const string& one = encode(1);
- const string& two = encode(2);
- const string& ten = encode(10);
-
- CHECK(leveldb::BytewiseComparator()->Compare(one, two) < 0);
- CHECK(leveldb::BytewiseComparator()->Compare(two, one) > 0);
- CHECK(leveldb::BytewiseComparator()->Compare(one, ten) < 0);
- CHECK(leveldb::BytewiseComparator()->Compare(ten, two) > 0);
- CHECK(leveldb::BytewiseComparator()->Compare(ten, ten) == 0);
-
- Stopwatch stopwatch;
- stopwatch.start();
-
- leveldb::Status status = leveldb::DB::Open(options, path, &db);
-
- if (!status.ok()) {
- // TODO(benh): Consider trying to repair the DB.
- return Error(status.ToString());
- }
-
- LOG(INFO) << "Opened db in " << stopwatch.elapsed();
-
- stopwatch.start(); // Restart the stopwatch.
-
- // TODO(benh): Conditionally compact to avoid long recovery times?
- db->CompactRange(NULL, NULL);
-
- LOG(INFO) << "Compacted db in " << stopwatch.elapsed();
-
- State state;
- state.begin = 0;
- state.end = 0;
-
- // TODO(benh): Consider just reading the "promise" record (e.g.,
- // 'encode(0, false)') and then iterating over the rest of the
- // records and confirming that they are all indeed of type
- // Record::Action.
-
- stopwatch.start(); // Restart the stopwatch.
-
- leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
-
- LOG(INFO) << "Created db iterator in " << stopwatch.elapsed();
-
- stopwatch.start(); // Restart the stopwatch.
-
- iterator->SeekToFirst();
-
- LOG(INFO) << "Seeked to beginning of db in " << stopwatch.elapsed();
-
- stopwatch.start(); // Restart the stopwatch.
-
- uint64_t keys = 0;
-
- while (iterator->Valid()) {
- keys++;
- const leveldb::Slice& slice = iterator->value();
-
- google::protobuf::io::ArrayInputStream stream(slice.data(), slice.size());
-
- Record record;
-
- if (!record.ParseFromZeroCopyStream(&stream)) {
- return Error("Failed to deserialize record");
- }
-
- switch (record.type()) {
- case Record::METADATA: {
- CHECK(record.has_metadata());
- state.metadata.CopyFrom(record.metadata());
- break;
- }
-
- // DEPRECATED!
- case Record::PROMISE: {
- CHECK(record.has_promise());
- // This replica is in old format. Set its status to VOTING
- // since there is no catch-up logic in the old code and this
- // replica is obviously not empty.
- state.metadata.set_status(Metadata::VOTING);
- state.metadata.set_promised(record.promise().proposal());
- break;
- }
-
- case Record::ACTION: {
- CHECK(record.has_action());
- const Action& action = record.action();
- if (action.has_learned() && action.learned()) {
- state.learned.insert(action.position());
- state.unlearned.erase(action.position());
- if (action.has_type() && action.type() == Action::TRUNCATE) {
- state.begin = std::max(state.begin, action.truncate().to());
- }
- } else {
- state.learned.erase(action.position());
- state.unlearned.insert(action.position());
- }
- state.end = std::max(state.end, action.position());
- break;
- }
-
- default: {
- return Error("Bad record");
- }
- }
-
- iterator->Next();
- }
-
- LOG(INFO) << "Iterated through " << keys
- << " keys in the db in " << stopwatch.elapsed();
-
- // Determine the first position still in leveldb so during a
- // truncation we can attempt to delete all positions from the first
- // position up to the truncate position. Note that this is not the
- // beginning position of the log, but rather the first position that
- // remains (i.e., hasn't been deleted) in leveldb.
- iterator->Seek(encode(0));
-
- if (iterator->Valid()) {
- first = decode(iterator->key());
- }
-
- delete iterator;
-
- return state;
-}
-
-
-Try<Nothing> LevelDBStorage::persist(const Metadata& metadata)
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- leveldb::WriteOptions options;
- options.sync = true;
-
- Record record;
- record.set_type(Record::METADATA);
- record.mutable_metadata()->CopyFrom(metadata);
-
- string value;
-
- if (!record.SerializeToString(&value)) {
- return Error("Failed to serialize record");
- }
-
- leveldb::Status status = db->Put(options, encode(0, false), value);
-
- if (!status.ok()) {
- return Error(status.ToString());
- }
-
- LOG(INFO) << "Persisting metadata (" << value.size()
- << " bytes) to leveldb took " << stopwatch.elapsed();
-
- return Nothing();
-}
-
-
-Try<Nothing> LevelDBStorage::persist(const Action& action)
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- Record record;
- record.set_type(Record::ACTION);
- record.mutable_action()->MergeFrom(action);
-
- string value;
-
- if (!record.SerializeToString(&value)) {
- return Error("Failed to serialize record");
- }
-
- leveldb::WriteOptions options;
- options.sync = true;
-
- leveldb::Status status = db->Put(options, encode(action.position()), value);
-
- if (!status.ok()) {
- return Error(status.ToString());
- }
-
- LOG(INFO) << "Persisting action (" << value.size()
- << " bytes) to leveldb took " << stopwatch.elapsed();
-
- // Delete positions if a truncate action has been *learned*. Note
- // that we do this in a best-effort fashion (i.e., we ignore any
- // failures to the database since we can always try again).
- if (action.has_type() && action.type() == Action::TRUNCATE &&
- action.has_learned() && action.learned()) {
- CHECK(action.has_truncate());
-
- stopwatch.start(); // Restart the stopwatch.
-
- // To actually perform the truncation in leveldb we need to remove
- // all the keys that represent positions no longer in the log. We
- // do this by attempting to delete all keys that represent the
- // first position we know is still in leveldb up to (but
- // excluding) the truncate position. Note that this works because
- // the semantics of WriteBatch are such that even if the position
- // doesn't exist (which is possible because this replica has some
- // holes), we can attempt to delete the key that represents it and
- // it will just ignore that key. This is *much* cheaper than
- // actually iterating through the entire database instead (which
- // was, for posterity, the original implementation). In addition,
- // caching the "first" position we know is in the database is
- // cheaper than using an iterator to determine the first position
- // (which was, for posterity, the second implementation).
-
- leveldb::WriteBatch batch;
-
- // Add positions up to (but excluding) the truncate position to
- // the batch starting at the first position still in leveldb.
- uint64_t index = 0;
- while ((first + index) < action.truncate().to()) {
- batch.Delete(encode(first + index));
- index++;
- }
-
- // If we added any positions, attempt to delete them!
- if (index > 0) {
- // We do this write asynchronously (e.g., using default options).
- leveldb::Status status = db->Write(leveldb::WriteOptions(), &batch);
-
- if (!status.ok()) {
- LOG(WARNING) << "Ignoring leveldb batch delete failure: "
- << status.ToString();
- } else {
- first = action.truncate().to(); // Save the new first position!
-
- LOG(INFO) << "Deleting ~" << index
- << " keys from leveldb took " << stopwatch.elapsed();
- }
- }
- }
-
- return Nothing();
-}
-
-
-Try<Action> LevelDBStorage::read(uint64_t position)
-{
- Stopwatch stopwatch;
- stopwatch.start();
-
- leveldb::ReadOptions options;
-
- string value;
-
- leveldb::Status status = db->Get(options, encode(position), &value);
-
- if (!status.ok()) {
- return Error(status.ToString());
- }
-
- google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
-
- Record record;
-
- if (!record.ParseFromZeroCopyStream(&stream)) {
- return Error("Failed to deserialize record");
- }
-
- if (record.type() != Record::ACTION) {
- return Error("Bad record");
- }
-
- LOG(INFO) << "Reading position from leveldb took " << stopwatch.elapsed();
-
- return record.action();
-}
-
-
class ReplicaProcess : public ProtobufProcess<ReplicaProcess>
{
public:
@@ -1140,7 +711,7 @@ bool ReplicaProcess::persist(const Action& action)
void ReplicaProcess::restore(const string& path)
{
- Try<State> state = storage->restore(path);
+ Try<Storage::State> state = storage->restore(path);
CHECK_SOME(state) << "Failed to recover the log";
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/replica.hpp
----------------------------------------------------------------------
diff --git a/src/log/replica.hpp b/src/log/replica.hpp
index ecb126d..467d0d9 100644
--- a/src/log/replica.hpp
+++ b/src/log/replica.hpp
@@ -19,14 +19,16 @@
#ifndef __LOG_REPLICA_HPP__
#define __LOG_REPLICA_HPP__
+#include <stdint.h>
+
#include <list>
#include <set>
#include <string>
+#include <process/future.hpp>
+#include <process/pid.hpp>
#include <process/protobuf.hpp>
-#include <stout/result.hpp>
-
#include "messages/log.hpp"
namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/storage.hpp
----------------------------------------------------------------------
diff --git a/src/log/storage.hpp b/src/log/storage.hpp
new file mode 100644
index 0000000..663146f
--- /dev/null
+++ b/src/log/storage.hpp
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_STORAGE_HPP__
+#define __LOG_STORAGE_HPP__
+
+#include <stdint.h>
+
+#include <set>
+#include <string>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "messages/log.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Abstract interface for reading and writing records.
+class Storage
+{
+public:
+ struct State
+ {
+ Metadata metadata; // The metadata for the replica.
+ uint64_t begin; // Beginning position of the log.
+ uint64_t end; // Ending position of the log.
+ std::set<uint64_t> learned; // Positions present and learned
+ std::set<uint64_t> unlearned; // Positions present but unlearned.
+ };
+
+ virtual ~Storage() {}
+
+ virtual Try<State> restore(const std::string& path) = 0;
+ virtual Try<Nothing> persist(const Metadata& metadata) = 0;
+ virtual Try<Nothing> persist(const Action& action) = 0;
+ virtual Try<Action> read(uint64_t position) = 0;
+};
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_STORAGE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool.hpp
----------------------------------------------------------------------
diff --git a/src/log/tool.hpp b/src/log/tool.hpp
new file mode 100644
index 0000000..656d3f6
--- /dev/null
+++ b/src/log/tool.hpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_TOOL_HPP__
+#define __LOG_TOOL_HPP__
+
+#include <string>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+// Represents a tool for processing a log file.
+class Tool
+{
+public:
+ virtual ~Tool() {}
+
+ virtual std::string name() const = 0;
+
+ // Executes the tool. The tool can be configured by passing in
+ // command line arguments. If command line arguments are not
+ // specified, the default configuration will be used.
+ virtual Try<Nothing> execute(int argc, char** argv) = 0;
+};
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_TOOL_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/initialize.cpp
----------------------------------------------------------------------
diff --git a/src/log/tool/initialize.cpp b/src/log/tool/initialize.cpp
new file mode 100644
index 0000000..ccda7fb
--- /dev/null
+++ b/src/log/tool/initialize.cpp
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <sstream>
+
+#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/error.hpp>
+
+#include "log/replica.hpp"
+#include "log/tool/initialize.hpp"
+
+#include "logging/logging.hpp"
+
+using namespace process;
+
+using std::endl;
+using std::ostringstream;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+Initialize::Flags::Flags()
+{
+ add(&Flags::path,
+ "path",
+ "Path to the log");
+
+ add(&Flags::timeout,
+ "timeout",
+ "Maximum time allowed for the command to finish\n"
+ "(e.g., 500ms, 1sec, etc.)");
+
+ add(&Flags::help,
+ "help",
+ "Prints the help message",
+ false);
+}
+
+
+string Initialize::usage(const string& argv0) const
+{
+ ostringstream out;
+
+ out << "Usage: " << argv0 << " " << name() << " [OPTIONS]" << endl
+ << endl
+ << "This command is used to initialize the log" << endl
+ << endl
+ << "Supported OPTIONS:" << endl
+ << flags.usage();
+
+ return out.str();
+}
+
+
+Try<Nothing> Initialize::execute(int argc, char** argv)
+{
+ // Configure the tool by parsing command line arguments.
+ if (argc > 0 && argv != NULL) {
+ Try<Nothing> load = flags.load(None(), argc, argv);
+ if (load.isError()) {
+ return Error(load.error() + "\n\n" + usage(argv[0]));
+ }
+
+ if (flags.help) {
+ return Error(usage(argv[0]));
+ }
+
+ process::initialize();
+ logging::initialize(argv[0], flags);
+ }
+
+ if (flags.path.isNone()) {
+ return Error("Missing flag: '--path'");
+ }
+
+ // Setup the timeout if specified.
+ Option<Timeout> timeout = None();
+ if (flags.timeout.isSome()) {
+ timeout = Timeout::in(flags.timeout.get());
+ }
+
+ Replica replica(flags.path.get());
+
+ // Get the current status of the replica.
+ Future<Metadata::Status> status = replica.status();
+ if (timeout.isSome()) {
+ status.await(timeout.get().remaining());
+ } else {
+ status.await();
+ }
+
+ if (status.isPending()) {
+ return Error("Timed out while getting replica status");
+ } else if (status.isDiscarded()) {
+ return Error("Failed to get status of replica (discarded future)");
+ } else if (status.isFailed()) {
+ return Error(status.failure());
+ }
+
+ // We only initialize a log if it is empty.
+ if (status.get() != Metadata::EMPTY) {
+ return Error("The log is not empty");
+ }
+
+ // Update the status of the replica to VOTING.
+ Future<bool> update = replica.update(Metadata::VOTING);
+ if (timeout.isSome()) {
+ update.await(timeout.get().remaining());
+ } else {
+ update.await();
+ }
+
+ if (update.isPending()) {
+ return Error("Timed out while setting replica status");
+ } else if (update.isDiscarded()) {
+ return Error("Failed to set replica status (discarded future)");
+ } else if (update.isFailed()) {
+ return Error(update.failure());
+ }
+
+ return Nothing();
+}
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/initialize.hpp
----------------------------------------------------------------------
diff --git a/src/log/tool/initialize.hpp b/src/log/tool/initialize.hpp
new file mode 100644
index 0000000..10ac269
--- /dev/null
+++ b/src/log/tool/initialize.hpp
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_TOOL_INITIALIZE_HPP__
+#define __LOG_TOOL_INITIALIZE_HPP__
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+
+#include "log/tool.hpp"
+
+#include "logging/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+class Initialize : public Tool
+{
+public:
+ class Flags : public logging::Flags
+ {
+ public:
+ Flags();
+
+ Option<std::string> path;
+ Option<Duration> timeout;
+ bool help;
+ };
+
+ virtual std::string name() const { return "initialize"; }
+ virtual Try<Nothing> execute(int argc = 0, char** argv = NULL);
+
+ // Users can change the default configuration by setting this flags.
+ Flags flags;
+
+private:
+ std::string usage(const std::string& argv0) const;
+};
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_TOOL_INITIALIZE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/read.cpp
----------------------------------------------------------------------
diff --git a/src/log/tool/read.cpp b/src/log/tool/read.cpp
new file mode 100644
index 0000000..ab6068d
--- /dev/null
+++ b/src/log/tool/read.cpp
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <sstream>
+
+#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/error.hpp>
+
+#include "log/replica.hpp"
+#include "log/tool/read.hpp"
+
+#include "logging/logging.hpp"
+
+using namespace process;
+
+using std::cout;
+using std::endl;
+using std::list;
+using std::ostringstream;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+Read::Flags::Flags()
+{
+ add(&Flags::path,
+ "path",
+ "Path to the log");
+
+ add(&Flags::from,
+ "from",
+ "Position from which to start reading the log");
+
+ add(&Flags::to,
+ "to",
+ "Position from which to stop reading the log");
+
+ add(&Flags::timeout,
+ "timeout",
+ "Maximum time allowed for the command to finish\n"
+ "(e.g., 500ms, 1sec, etc.)");
+
+ add(&Flags::help,
+ "help",
+ "Prints the help message",
+ false);
+}
+
+
+string Read::usage(const string& argv0) const
+{
+ ostringstream out;
+
+ out << "Usage: " << argv0 << " " << name() << " [OPTIONS]" << endl
+ << endl
+ << "This command is used to read the log" << endl
+ << endl
+ << "Supported OPTIONS:" << endl
+ << flags.usage();
+
+ return out.str();
+}
+
+
+Try<Nothing> Read::execute(int argc, char** argv)
+{
+ // Configure the tool by parsing command line arguments.
+ if (argc > 0 && argv != NULL) {
+ Try<Nothing> load = flags.load(None(), argc, argv);
+ if (load.isError()) {
+ return Error(load.error() + "\n\n" + usage(argv[0]));
+ }
+
+ if (flags.help) {
+ return Error(usage(argv[0]));
+ }
+
+ process::initialize();
+ logging::initialize(argv[0], flags);
+ }
+
+ if (flags.path.isNone()) {
+ return Error("Missing flag '--path'");
+ }
+
+ // Setup the timeout if specified.
+ Option<Timeout> timeout = None();
+ if (flags.timeout.isSome()) {
+ timeout = Timeout::in(flags.timeout.get());
+ }
+
+ Replica replica(flags.path.get());
+
+ // Get the beginning of the replica.
+ Future<uint64_t> begin = replica.beginning();
+ if (timeout.isSome()) {
+ begin.await(timeout.get().remaining());
+ } else {
+ begin.await();
+ }
+
+ if (begin.isPending()) {
+ return Error("Timed out while getting the beginning of the replica");
+ } else if (begin.isDiscarded()) {
+ return Error(
+ "Failed to get the beginning of the replica (discarded future)");
+ } else if (begin.isFailed()) {
+ return Error(begin.failure());
+ }
+
+ // Get the ending of the replica.
+ Future<uint64_t> end = replica.ending();
+ if (timeout.isSome()) {
+ end.await(timeout.get().remaining());
+ } else {
+ end.await();
+ }
+
+ if (end.isPending()) {
+ return Error("Timed out while getting the ending of the replica");
+ } else if (end.isDiscarded()) {
+ return Error(
+ "Failed to get the ending of the replica (discarded future)");
+ } else if (end.isFailed()) {
+ return Error(end.failure());
+ }
+
+ Option<uint64_t> from = flags.from;
+ if (from.isNone()) {
+ from = begin.get();
+ }
+
+ Option<uint64_t> to = flags.to;
+ if (to.isNone()) {
+ to = end.get();
+ }
+
+ LOG(INFO) << "Attempting to read the log from "
+ << from.get() << " to " << to.get() << endl;
+
+ Future<list<Action> > actions = replica.read(from.get(), to.get());
+ if (timeout.isSome()) {
+ actions.await(timeout.get().remaining());
+ } else {
+ actions.await();
+ }
+
+ if (actions.isPending()) {
+ return Error("Timed out while reading the replica");
+ } else if (actions.isDiscarded()) {
+ return Error("Failed to read the replica (discarded future)");
+ } else if (actions.isFailed()) {
+ return Error(actions.failure());
+ }
+
+ foreach (const Action& action, actions.get()) {
+ cout << "----------------------------------------------" << endl;
+ action.PrintDebugString();
+ }
+
+ return Nothing();
+}
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/read.hpp
----------------------------------------------------------------------
diff --git a/src/log/tool/read.hpp b/src/log/tool/read.hpp
new file mode 100644
index 0000000..74faec0
--- /dev/null
+++ b/src/log/tool/read.hpp
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_TOOL_READ_HPP__
+#define __LOG_TOOL_READ_HPP__
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+
+#include "log/tool.hpp"
+
+#include "logging/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+class Read : public Tool
+{
+public:
+ class Flags : public logging::Flags
+ {
+ public:
+ Flags();
+
+ Option<std::string> path;
+ Option<uint64_t> from;
+ Option<uint64_t> to;
+ Option<Duration> timeout;
+ bool help;
+ };
+
+ virtual std::string name() const { return "read"; }
+ virtual Try<Nothing> execute(int argc = 0, char** argv = NULL);
+
+ // Users can change the default configuration by setting this flags.
+ Flags flags;
+
+private:
+ std::string usage(const std::string& argv0) const;
+};
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_TOOL_READ_HPP__