You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:26 UTC

[06/10] git commit: Refactored log tools and added a tool to initialize the log.

Refactored log tools and added a tool to initialize the log.

Also pulled storage related code out (I haven't changed them) from
replica.cpp.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/16433


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e2fe5860
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e2fe5860
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e2fe5860

Branch: refs/heads/master
Commit: e2fe5860bc8542e5408bc86ac7322002326d41b3
Parents: f9b60c4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:55:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:55:00 2014 -0800

----------------------------------------------------------------------
 src/Makefile.am             |  14 +-
 src/log/leveldb.cpp         | 422 +++++++++++++++++++++++++++++++++++++
 src/log/leveldb.hpp         |  51 +++++
 src/log/main.cpp            | 132 +++++-------
 src/log/replica.cpp         | 439 +--------------------------------------
 src/log/replica.hpp         |   6 +-
 src/log/storage.hpp         |  61 ++++++
 src/log/tool.hpp            |  51 +++++
 src/log/tool/initialize.cpp | 148 +++++++++++++
 src/log/tool/initialize.hpp |  63 ++++++
 src/log/tool/read.cpp       | 188 +++++++++++++++++
 src/log/tool/read.hpp       |  65 ++++++
 12 files changed, 1120 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 60fcb31..d58b46e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -301,17 +301,25 @@ liblog_la_SOURCES =							\
   log/catchup.cpp							\
   log/consensus.cpp							\
   log/coordinator.cpp							\
+  log/leveldb.cpp							\
   log/log.cpp								\
   log/recover.cpp							\
-  log/replica.cpp
+  log/replica.cpp							\
+  log/tool/initialize.cpp						\
+  log/tool/read.cpp
 liblog_la_SOURCES +=							\
   log/catchup.hpp							\
   log/consensus.hpp							\
   log/coordinator.hpp							\
-  log/recover.hpp							\
-  log/replica.hpp							\
+  log/leveldb.hpp							\
   log/log.hpp								\
   log/network.hpp							\
+  log/recover.hpp							\
+  log/replica.hpp							\
+  log/storage.hpp							\
+  log/tool.hpp								\
+  log/tool/initialize.hpp						\
+  log/tool/read.hpp							\
   messages/log.hpp							\
   messages/log.proto
 nodist_liblog_la_SOURCES = $(LOG_PROTOS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/log/leveldb.cpp b/src/log/leveldb.cpp
new file mode 100644
index 0000000..7819963
--- /dev/null
+++ b/src/log/leveldb.cpp
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+
+#include <leveldb/comparator.h>
+#include <leveldb/write_batch.h>
+
+#include <stout/check.hpp>
+#include <stout/error.hpp>
+#include <stout/numify.hpp>
+#include <stout/stopwatch.hpp>
+#include <stout/strings.hpp>
+
+#include "log/leveldb.hpp"
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+class Varint64Comparator : public leveldb::Comparator
+{
+public:
+  virtual int Compare(
+      const leveldb::Slice& a,
+      const leveldb::Slice& b) const
+  {
+    // TODO(benh): Use varint comparator.
+    LOG(FATAL) << "Unimplemented";
+    // uint64_t left = position(a);
+    // uint64_t right = position(b);
+    // if (left < right) return -1;
+    // if (left == right) return 0;
+    // if (left > right) return 1;
+  }
+
+  virtual const char* Name() const
+  {
+    // Note that this name MUST NOT CHANGE across uses of this
+    // comparator with the same DB (the semantics of doing so are
+    // undefined if the database doesn't catch this first).
+    return "varint64";
+  }
+
+  virtual void FindShortestSeparator(
+      string* start,
+      const leveldb::Slice& limit) const
+  {
+    // Intentional no-op.
+  }
+
+  virtual void FindShortSuccessor(string* key) const
+  {
+    // Intentional no-op.
+  }
+};
+
+
+// TODO(benh): Use varint comparator.
+// static Varint64Comparator comparator;
+
+
+// Returns a string representing the specified position. Note that we
+// adjust the actual position by incrementing it by 1 because we
+// reserve 0 for storing the promise record (Record::Promise,
+// DEPRECATED!), or the metadata (Record::Metadata).
+static string encode(uint64_t position, bool adjust = true)
+{
+  // Adjusted stringified represenation is plus 1 of actual position.
+  position = adjust ? position + 1 : position;
+
+  // TODO(benh): Use varint encoding for VarInt64Comparator!
+  // string s;
+  // google::protobuf::io::StringOutputStream _stream(&s);
+  // google::protobuf::io::CodedOutputStream stream(&_stream);
+  // position = adjust ? position + 1 : position;
+  // stream.WriteVarint64(position);
+  // return s;
+
+  Try<string> s = strings::format("%.*d", 10, position);
+  CHECK_SOME(s);
+  return s.get();
+}
+
+
+// Returns the position as represented in the specified slice
+// (performing a decrement as necessary to determine the actual
+// position represented).
+static uint64_t decode(const leveldb::Slice& s)
+{
+  // TODO(benh): Use varint decoding for VarInt64Comparator!
+  // uint64_t position;
+  // google::protobuf::io::ArrayInputStream _stream(s.data(), s.size());
+  // google::protobuf::io::CodedInputStream stream(&_stream);
+  // bool success = stream.ReadVarint64(&position);
+  // CHECK(success);
+  // return position - 1; // Actual position is less 1 of stringified.
+  Try<uint64_t> position = numify<uint64_t>(string(s.data(), s.size()));
+  CHECK_SOME(position);
+  return position.get() - 1; // Actual position is less 1 of stringified.
+}
+
+
+LevelDBStorage::LevelDBStorage()
+  : db(NULL), first(0)
+{
+  // Nothing to see here.
+}
+
+
+LevelDBStorage::~LevelDBStorage()
+{
+  delete db; // Might be null if open failed in LevelDBStorage::recover.
+}
+
+
+Try<LevelDBStorage::State> LevelDBStorage::restore(const string& path)
+{
+  leveldb::Options options;
+  options.create_if_missing = true;
+
+  // TODO(benh): Can't use varint comparator until bug discussed at
+  // groups.google.com/group/leveldb/browse_thread/thread/17eac39168909ba7
+  // gets fixed. For now, we are using the default byte-wise
+  // comparator and *assuming* that the encoding from unsigned long to
+  // string produces a stable ordering. Checks below.
+  // options.comparator = &comparator;
+
+  const string& one = encode(1);
+  const string& two = encode(2);
+  const string& ten = encode(10);
+
+  CHECK(leveldb::BytewiseComparator()->Compare(one, two) < 0);
+  CHECK(leveldb::BytewiseComparator()->Compare(two, one) > 0);
+  CHECK(leveldb::BytewiseComparator()->Compare(one, ten) < 0);
+  CHECK(leveldb::BytewiseComparator()->Compare(ten, two) > 0);
+  CHECK(leveldb::BytewiseComparator()->Compare(ten, ten) == 0);
+
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  leveldb::Status status = leveldb::DB::Open(options, path, &db);
+
+  if (!status.ok()) {
+    // TODO(benh): Consider trying to repair the DB.
+    return Error(status.ToString());
+  }
+
+  LOG(INFO) << "Opened db in " << stopwatch.elapsed();
+
+  stopwatch.start(); // Restart the stopwatch.
+
+  // TODO(benh): Conditionally compact to avoid long recovery times?
+  db->CompactRange(NULL, NULL);
+
+  LOG(INFO) << "Compacted db in " << stopwatch.elapsed();
+
+  State state;
+  state.begin = 0;
+  state.end = 0;
+
+  // TODO(benh): Consider just reading the "promise" record (e.g.,
+  // 'encode(0, false)') and then iterating over the rest of the
+  // records and confirming that they are all indeed of type
+  // Record::Action.
+
+  stopwatch.start(); // Restart the stopwatch.
+
+  leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
+
+  LOG(INFO) << "Created db iterator in " << stopwatch.elapsed();
+
+  stopwatch.start(); // Restart the stopwatch.
+
+  iterator->SeekToFirst();
+
+  LOG(INFO) << "Seeked to beginning of db in " << stopwatch.elapsed();
+
+  stopwatch.start(); // Restart the stopwatch.
+
+  uint64_t keys = 0;
+
+  while (iterator->Valid()) {
+    keys++;
+    const leveldb::Slice& slice = iterator->value();
+
+    google::protobuf::io::ArrayInputStream stream(slice.data(), slice.size());
+
+    Record record;
+
+    if (!record.ParseFromZeroCopyStream(&stream)) {
+      return Error("Failed to deserialize record");
+    }
+
+    switch (record.type()) {
+      case Record::METADATA: {
+        CHECK(record.has_metadata());
+        state.metadata.CopyFrom(record.metadata());
+        break;
+      }
+
+      // DEPRECATED!
+      case Record::PROMISE: {
+        CHECK(record.has_promise());
+        // This replica is in old format. Set its status to VOTING
+        // since there is no catch-up logic in the old code and this
+        // replica is obviously not empty.
+        state.metadata.set_status(Metadata::VOTING);
+        state.metadata.set_promised(record.promise().proposal());
+        break;
+      }
+
+      case Record::ACTION: {
+        CHECK(record.has_action());
+        const Action& action = record.action();
+        if (action.has_learned() && action.learned()) {
+          state.learned.insert(action.position());
+          state.unlearned.erase(action.position());
+          if (action.has_type() && action.type() == Action::TRUNCATE) {
+            state.begin = std::max(state.begin, action.truncate().to());
+          }
+        } else {
+          state.learned.erase(action.position());
+          state.unlearned.insert(action.position());
+        }
+        state.end = std::max(state.end, action.position());
+        break;
+      }
+
+      default: {
+        return Error("Bad record");
+      }
+    }
+
+    iterator->Next();
+  }
+
+  LOG(INFO) << "Iterated through " << keys
+            << " keys in the db in " << stopwatch.elapsed();
+
+  // Determine the first position still in leveldb so during a
+  // truncation we can attempt to delete all positions from the first
+  // position up to the truncate position. Note that this is not the
+  // beginning position of the log, but rather the first position that
+  // remains (i.e., hasn't been deleted) in leveldb.
+  iterator->Seek(encode(0));
+
+  if (iterator->Valid()) {
+    first = decode(iterator->key());
+  }
+
+  delete iterator;
+
+  return state;
+}
+
+
+Try<Nothing> LevelDBStorage::persist(const Metadata& metadata)
+{
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  leveldb::WriteOptions options;
+  options.sync = true;
+
+  Record record;
+  record.set_type(Record::METADATA);
+  record.mutable_metadata()->CopyFrom(metadata);
+
+  string value;
+
+  if (!record.SerializeToString(&value)) {
+    return Error("Failed to serialize record");
+  }
+
+  leveldb::Status status = db->Put(options, encode(0, false), value);
+
+  if (!status.ok()) {
+    return Error(status.ToString());
+  }
+
+  LOG(INFO) << "Persisting metadata (" << value.size()
+            << " bytes) to leveldb took " << stopwatch.elapsed();
+
+  return Nothing();
+}
+
+
+Try<Nothing> LevelDBStorage::persist(const Action& action)
+{
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  Record record;
+  record.set_type(Record::ACTION);
+  record.mutable_action()->MergeFrom(action);
+
+  string value;
+
+  if (!record.SerializeToString(&value)) {
+    return Error("Failed to serialize record");
+  }
+
+  leveldb::WriteOptions options;
+  options.sync = true;
+
+  leveldb::Status status = db->Put(options, encode(action.position()), value);
+
+  if (!status.ok()) {
+    return Error(status.ToString());
+  }
+
+  LOG(INFO) << "Persisting action (" << value.size()
+            << " bytes) to leveldb took " << stopwatch.elapsed();
+
+  // Delete positions if a truncate action has been *learned*. Note
+  // that we do this in a best-effort fashion (i.e., we ignore any
+  // failures to the database since we can always try again).
+  if (action.has_type() && action.type() == Action::TRUNCATE &&
+      action.has_learned() && action.learned()) {
+    CHECK(action.has_truncate());
+
+    stopwatch.start(); // Restart the stopwatch.
+
+    // To actually perform the truncation in leveldb we need to remove
+    // all the keys that represent positions no longer in the log. We
+    // do this by attempting to delete all keys that represent the
+    // first position we know is still in leveldb up to (but
+    // excluding) the truncate position. Note that this works because
+    // the semantics of WriteBatch are such that even if the position
+    // doesn't exist (which is possible because this replica has some
+    // holes), we can attempt to delete the key that represents it and
+    // it will just ignore that key. This is *much* cheaper than
+    // actually iterating through the entire database instead (which
+    // was, for posterity, the original implementation). In addition,
+    // caching the "first" position we know is in the database is
+    // cheaper than using an iterator to determine the first position
+    // (which was, for posterity, the second implementation).
+
+    leveldb::WriteBatch batch;
+
+    // Add positions up to (but excluding) the truncate position to
+    // the batch starting at the first position still in leveldb.
+    uint64_t index = 0;
+    while ((first + index) < action.truncate().to()) {
+      batch.Delete(encode(first + index));
+      index++;
+    }
+
+    // If we added any positions, attempt to delete them!
+    if (index > 0) {
+      // We do this write asynchronously (e.g., using default options).
+      leveldb::Status status = db->Write(leveldb::WriteOptions(), &batch);
+
+      if (!status.ok()) {
+        LOG(WARNING) << "Ignoring leveldb batch delete failure: "
+                     << status.ToString();
+      } else {
+        first = action.truncate().to(); // Save the new first position!
+
+        LOG(INFO) << "Deleting ~" << index
+                  << " keys from leveldb took " << stopwatch.elapsed();
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+
+Try<Action> LevelDBStorage::read(uint64_t position)
+{
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  leveldb::ReadOptions options;
+
+  string value;
+
+  leveldb::Status status = db->Get(options, encode(position), &value);
+
+  if (!status.ok()) {
+    return Error(status.ToString());
+  }
+
+  google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+
+  Record record;
+
+  if (!record.ParseFromZeroCopyStream(&stream)) {
+    return Error("Failed to deserialize record");
+  }
+
+  if (record.type() != Record::ACTION) {
+    return Error("Bad record");
+  }
+
+  LOG(INFO) << "Reading position from leveldb took " << stopwatch.elapsed();
+
+  return record.action();
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/leveldb.hpp
----------------------------------------------------------------------
diff --git a/src/log/leveldb.hpp b/src/log/leveldb.hpp
new file mode 100644
index 0000000..7eb51be
--- /dev/null
+++ b/src/log/leveldb.hpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_LEVELDB_HPP__
+#define __LOG_LEVELDB_HPP__
+
+#include <leveldb/db.h>
+
+#include "log/storage.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Concrete implementation of the storage interface using leveldb.
+class LevelDBStorage : public Storage
+{
+public:
+  LevelDBStorage();
+  virtual ~LevelDBStorage();
+
+  virtual Try<State> restore(const std::string& path);
+  virtual Try<Nothing> persist(const Metadata& metadata);
+  virtual Try<Nothing> persist(const Action& action);
+  virtual Try<Action> read(uint64_t position);
+
+private:
+  leveldb::DB* db;
+  uint64_t first; // First position still in leveldb, used during truncation.
+};
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_LEVELDB_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/main.cpp
----------------------------------------------------------------------
diff --git a/src/log/main.cpp b/src/log/main.cpp
index f07bd10..c37dd6f 100644
--- a/src/log/main.cpp
+++ b/src/log/main.cpp
@@ -16,119 +16,89 @@
  * limitations under the License.
  */
 
+#include <string.h>
+
 #include <iostream>
-#include <list>
 #include <string>
 
-#include <process/process.hpp>
+#include <process/owned.hpp>
 
-#include <stout/check.hpp>
-#include <stout/flags.hpp>
 #include <stout/foreach.hpp>
-#include <stout/none.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-
-#include "log/replica.hpp"
+#include <stout/hashmap.hpp>
 
-#include "logging/flags.hpp"
-#include "logging/logging.hpp"
+#include "log/tool.hpp"
+#include "log/tool/initialize.hpp"
+#include "log/tool/read.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::internal::log;
 
+using namespace process;
+
 using std::cerr;
-using std::cout;
 using std::endl;
 using std::string;
 
+// All the registered tools.
+static hashmap<string, Owned<tool::Tool> > tools;
 
-void usage(const char* argv0, const flags::FlagsBase& flags)
+
+static void add(const Owned<tool::Tool>& tool)
 {
-  cerr << "Usage: " << os::basename(argv0).get() << " [...] path/to/log"
-       << endl
-       << "Supported options:" << endl
-       << flags.usage();
+  tools[tool->name()] = tool;
 }
 
 
-int main(int argc, char** argv)
+static void usage(const char* argv0)
 {
-  flags::Flags<logging::Flags> flags;
-
-  Option<uint64_t> from;
-  flags.add(&from,
-            "from",
-            "Position from which to start reading in the log");
-
-  Option<uint64_t> to;
-  flags.add(&to,
-            "to",
-            "Position from which to stop reading in the log");
-
-  bool help;
-  flags.add(&help,
-            "help",
-            "Prints this help message",
-            false);
-
-  Try<Nothing> load = flags.load(None(), argc, argv);
-
-  if (load.isError()) {
-    cerr << load.error() << endl;
-    usage(argv[0], flags);
-    exit(1);
-  }
+  cerr << "Usage: " << argv0 << " <command> [OPTIONS]" << endl
+       << endl
+       << "Available commands:" << endl
+       << "    help" << endl;
 
-  if (help) {
-    usage(argv[0], flags);
-    exit(1);
+  // Get a list of available tools.
+  foreachkey (const string& name, tools) {
+    cerr << "    " << name << endl;
   }
+}
 
-  process::initialize();
-
-  logging::initialize(argv[0], flags);
-
-  string path = argv[argc - 1];
-
-  Replica replica(path);
-
-  process::Future<uint64_t> begin = replica.beginning();
-  process::Future<uint64_t> end = replica.ending();
-
-  begin.await();
-  end.await();
-
-  CHECK(begin.isReady());
-  CHECK(end.isReady());
 
-  if (!from.isSome()) {
-    from = begin.get();
-  }
+int main(int argc, char** argv)
+{
+  // Register log tools.
+  add(Owned<tool::Tool>(new tool::Initialize()));
+  add(Owned<tool::Tool>(new tool::Read()));
 
-  if (!to.isSome()) {
-    to = end.get();
+  if (argc < 2) {
+    usage(argv[0]);
+    return 1;
   }
 
-  CHECK_SOME(from);
-  CHECK_SOME(to);
-
-  cerr << endl << "Attempting to read the log from "
-       << from.get() << " to " << to.get() << endl << endl;
-
-  process::Future<std::list<Action> > actions =
-    replica.read(from.get(), to.get());
+  if (!strcmp(argv[1], "help")) {
+    if (argc == 2) {
+      usage(argv[0]);
+      return 0;
+    }
 
-  actions.await();
+    // 'mesos-log help command' => 'mesos-log command --help'
+    argv[1] = argv[2];
+    argv[2] = (char*) "--help";
+  }
 
-  CHECK(!actions.isFailed()) << actions.failure();
+  string command = argv[1];
 
-  CHECK(actions.isReady());
+  if (!tools.contains(command)) {
+    cerr << "Cannot find command '" << command << "'" << endl << endl;
+    usage(argv[0]);
+    return 1;
+  }
 
-  foreach (const Action& action, actions.get()) {
-    cout << "----------------------------------------------" << endl;
-    action.PrintDebugString();
+  // Execute the command.
+  Try<Nothing> execute = tools[command]->execute(argc, argv);
+  if (execute.isError()) {
+    cerr << execute.error() << endl;
+    return 1;
   }
 
   return 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/replica.cpp
----------------------------------------------------------------------
diff --git a/src/log/replica.cpp b/src/log/replica.cpp
index da9310f..ec6e38c 100644
--- a/src/log/replica.cpp
+++ b/src/log/replica.cpp
@@ -16,12 +16,6 @@
  * limitations under the License.
  */
 
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-#include <leveldb/comparator.h>
-#include <leveldb/db.h>
-#include <leveldb/write_batch.h>
-
 #include <algorithm>
 
 #include <process/dispatch.hpp>
@@ -32,22 +26,18 @@
 #include <stout/foreach.hpp>
 #include <stout/none.hpp>
 #include <stout/nothing.hpp>
-#include <stout/numify.hpp>
-#include <stout/stopwatch.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
 #include <stout/utils.hpp>
 
 #include "common/type_utils.hpp"
 
+#include "log/leveldb.hpp"
 #include "log/replica.hpp"
-
-#include "logging/logging.hpp"
-
-#include "messages/log.hpp"
+#include "log/storage.hpp"
 
 using namespace process;
 
-using process::wait; // Necessary on some OS's to disambiguate.
-
 using std::list;
 using std::set;
 using std::string;
@@ -66,425 +56,6 @@ Protocol<RecoverRequest, RecoverResponse> recover;
 } // namespace protocol {
 
 
-struct State
-{
-  Metadata metadata; // The metadata for the replica.
-  uint64_t begin; // Beginning position of the log.
-  uint64_t end; // Ending position of the log.
-  set<uint64_t> learned; // Positions present and learned
-  set<uint64_t> unlearned; // Positions present but unlearned.
-};
-
-
-// Abstract interface for reading and writing records.
-class Storage
-{
-public:
-  virtual ~Storage() {}
-  virtual Try<State> restore(const string& path) = 0;
-  virtual Try<Nothing> persist(const Metadata& metadata) = 0;
-  virtual Try<Nothing> persist(const Action& action) = 0;
-  virtual Try<Action> read(uint64_t position) = 0;
-};
-
-
-// Concrete implementation of the storage interface using leveldb.
-class LevelDBStorage : public Storage
-{
-public:
-  LevelDBStorage();
-  virtual ~LevelDBStorage();
-
-  virtual Try<State> restore(const string& path);
-  virtual Try<Nothing> persist(const Metadata& metadata);
-  virtual Try<Nothing> persist(const Action& action);
-  virtual Try<Action> read(uint64_t position);
-
-private:
-  class Varint64Comparator : public leveldb::Comparator
-  {
-  public:
-    virtual int Compare(
-        const leveldb::Slice& a,
-        const leveldb::Slice& b) const
-    {
-      // TODO(benh): Use varint comparator.
-      LOG(FATAL) << "Unimplemented";
-      // uint64_t left = position(a);
-      // uint64_t right = position(b);
-      // if (left < right) return -1;
-      // if (left == right) return 0;
-      // if (left > right) return 1;
-    }
-
-    virtual const char* Name() const
-    {
-      // Note that this name MUST NOT CHANGE across uses of this
-      // comparator with the same DB (the semantics of doing so are
-      // undefined if the database doesn't catch this first).
-      return "varint64";
-    }
-
-    virtual void FindShortestSeparator(
-        string* start,
-        const leveldb::Slice& limit) const
-    {
-      // Intentional no-op.
-    }
-
-    virtual void FindShortSuccessor(string* key) const
-    {
-      // Intentional no-op.
-    }
-  };
-
-  // Returns a string representing the specified position. Note that
-  // we adjust the actual position by incrementing it by 1 because we
-  // reserve 0 for storing the promise record (Record::Promise,
-  // DEPRECATED!), or the metadata (Record::Metadata).
-  static string encode(uint64_t position, bool adjust = true)
-  {
-    // Adjusted stringified represenation is plus 1 of actual position.
-    position = adjust ? position + 1 : position;
-
-    // TODO(benh): Use varint encoding for VarInt64Comparator!
-    // string s;
-    // google::protobuf::io::StringOutputStream _stream(&s);
-    // google::protobuf::io::CodedOutputStream stream(&_stream);
-    // position = adjust ? position + 1 : position;
-    // stream.WriteVarint64(position);
-    // return s;
-
-    Try<string> s = strings::format("%.*d", 10, position);
-    CHECK_SOME(s);
-    return s.get();
-  }
-
-  // Returns the position as represented in the specified slice
-  // (performing a decrement as necessary to determine the actual
-  // position represented).
-  static uint64_t decode(const leveldb::Slice& s)
-  {
-    // TODO(benh): Use varint decoding for VarInt64Comparator!
-    // uint64_t position;
-    // google::protobuf::io::ArrayInputStream _stream(s.data(), s.size());
-    // google::protobuf::io::CodedInputStream stream(&_stream);
-    // bool success = stream.ReadVarint64(&position);
-    // CHECK(success);
-    // return position - 1; // Actual position is less 1 of stringified.
-    Try<uint64_t> position = numify<uint64_t>(string(s.data(), s.size()));
-    CHECK_SOME(position);
-    return position.get() - 1; // Actual position is less 1 of stringified.
-  }
-
-  // Varint64Comparator comparator; // TODO(benh): Use varint comparator.
-
-  leveldb::DB* db;
-
-  uint64_t first; // First position still in leveldb, used during truncation.
-};
-
-
-LevelDBStorage::LevelDBStorage()
-  : db(NULL), first(0)
-{
-  // Nothing to see here.
-}
-
-
-LevelDBStorage::~LevelDBStorage()
-{
-  delete db; // Might be null if open failed in LevelDBStorage::recover.
-}
-
-
-Try<State> LevelDBStorage::restore(const string& path)
-{
-  leveldb::Options options;
-  options.create_if_missing = true;
-
-  // TODO(benh): Can't use varint comparator until bug discussed at
-  // groups.google.com/group/leveldb/browse_thread/thread/17eac39168909ba7
-  // gets fixed. For now, we are using the default byte-wise
-  // comparator and *assuming* that the encoding from unsigned long to
-  // string produces a stable ordering. Checks below.
-  // options.comparator = &comparator;
-
-  const string& one = encode(1);
-  const string& two = encode(2);
-  const string& ten = encode(10);
-
-  CHECK(leveldb::BytewiseComparator()->Compare(one, two) < 0);
-  CHECK(leveldb::BytewiseComparator()->Compare(two, one) > 0);
-  CHECK(leveldb::BytewiseComparator()->Compare(one, ten) < 0);
-  CHECK(leveldb::BytewiseComparator()->Compare(ten, two) > 0);
-  CHECK(leveldb::BytewiseComparator()->Compare(ten, ten) == 0);
-
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  leveldb::Status status = leveldb::DB::Open(options, path, &db);
-
-  if (!status.ok()) {
-    // TODO(benh): Consider trying to repair the DB.
-    return Error(status.ToString());
-  }
-
-  LOG(INFO) << "Opened db in " << stopwatch.elapsed();
-
-  stopwatch.start(); // Restart the stopwatch.
-
-  // TODO(benh): Conditionally compact to avoid long recovery times?
-  db->CompactRange(NULL, NULL);
-
-  LOG(INFO) << "Compacted db in " << stopwatch.elapsed();
-
-  State state;
-  state.begin = 0;
-  state.end = 0;
-
-  // TODO(benh): Consider just reading the "promise" record (e.g.,
-  // 'encode(0, false)') and then iterating over the rest of the
-  // records and confirming that they are all indeed of type
-  // Record::Action.
-
-  stopwatch.start(); // Restart the stopwatch.
-
-  leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
-
-  LOG(INFO) << "Created db iterator in " << stopwatch.elapsed();
-
-  stopwatch.start(); // Restart the stopwatch.
-
-  iterator->SeekToFirst();
-
-  LOG(INFO) << "Seeked to beginning of db in " << stopwatch.elapsed();
-
-  stopwatch.start(); // Restart the stopwatch.
-
-  uint64_t keys = 0;
-
-  while (iterator->Valid()) {
-    keys++;
-    const leveldb::Slice& slice = iterator->value();
-
-    google::protobuf::io::ArrayInputStream stream(slice.data(), slice.size());
-
-    Record record;
-
-    if (!record.ParseFromZeroCopyStream(&stream)) {
-      return Error("Failed to deserialize record");
-    }
-
-    switch (record.type()) {
-      case Record::METADATA: {
-        CHECK(record.has_metadata());
-        state.metadata.CopyFrom(record.metadata());
-        break;
-      }
-
-      // DEPRECATED!
-      case Record::PROMISE: {
-        CHECK(record.has_promise());
-        // This replica is in old format. Set its status to VOTING
-        // since there is no catch-up logic in the old code and this
-        // replica is obviously not empty.
-        state.metadata.set_status(Metadata::VOTING);
-        state.metadata.set_promised(record.promise().proposal());
-        break;
-      }
-
-      case Record::ACTION: {
-        CHECK(record.has_action());
-        const Action& action = record.action();
-        if (action.has_learned() && action.learned()) {
-          state.learned.insert(action.position());
-          state.unlearned.erase(action.position());
-          if (action.has_type() && action.type() == Action::TRUNCATE) {
-            state.begin = std::max(state.begin, action.truncate().to());
-          }
-        } else {
-          state.learned.erase(action.position());
-          state.unlearned.insert(action.position());
-        }
-        state.end = std::max(state.end, action.position());
-        break;
-      }
-
-      default: {
-        return Error("Bad record");
-      }
-    }
-
-    iterator->Next();
-  }
-
-  LOG(INFO) << "Iterated through " << keys
-            << " keys in the db in " << stopwatch.elapsed();
-
-  // Determine the first position still in leveldb so during a
-  // truncation we can attempt to delete all positions from the first
-  // position up to the truncate position. Note that this is not the
-  // beginning position of the log, but rather the first position that
-  // remains (i.e., hasn't been deleted) in leveldb.
-  iterator->Seek(encode(0));
-
-  if (iterator->Valid()) {
-    first = decode(iterator->key());
-  }
-
-  delete iterator;
-
-  return state;
-}
-
-
-Try<Nothing> LevelDBStorage::persist(const Metadata& metadata)
-{
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  leveldb::WriteOptions options;
-  options.sync = true;
-
-  Record record;
-  record.set_type(Record::METADATA);
-  record.mutable_metadata()->CopyFrom(metadata);
-
-  string value;
-
-  if (!record.SerializeToString(&value)) {
-    return Error("Failed to serialize record");
-  }
-
-  leveldb::Status status = db->Put(options, encode(0, false), value);
-
-  if (!status.ok()) {
-    return Error(status.ToString());
-  }
-
-  LOG(INFO) << "Persisting metadata (" << value.size()
-            << " bytes) to leveldb took " << stopwatch.elapsed();
-
-  return Nothing();
-}
-
-
-Try<Nothing> LevelDBStorage::persist(const Action& action)
-{
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  Record record;
-  record.set_type(Record::ACTION);
-  record.mutable_action()->MergeFrom(action);
-
-  string value;
-
-  if (!record.SerializeToString(&value)) {
-    return Error("Failed to serialize record");
-  }
-
-  leveldb::WriteOptions options;
-  options.sync = true;
-
-  leveldb::Status status = db->Put(options, encode(action.position()), value);
-
-  if (!status.ok()) {
-    return Error(status.ToString());
-  }
-
-  LOG(INFO) << "Persisting action (" << value.size()
-            << " bytes) to leveldb took " << stopwatch.elapsed();
-
-  // Delete positions if a truncate action has been *learned*. Note
-  // that we do this in a best-effort fashion (i.e., we ignore any
-  // failures to the database since we can always try again).
-  if (action.has_type() && action.type() == Action::TRUNCATE &&
-      action.has_learned() && action.learned()) {
-    CHECK(action.has_truncate());
-
-    stopwatch.start(); // Restart the stopwatch.
-
-    // To actually perform the truncation in leveldb we need to remove
-    // all the keys that represent positions no longer in the log. We
-    // do this by attempting to delete all keys that represent the
-    // first position we know is still in leveldb up to (but
-    // excluding) the truncate position. Note that this works because
-    // the semantics of WriteBatch are such that even if the position
-    // doesn't exist (which is possible because this replica has some
-    // holes), we can attempt to delete the key that represents it and
-    // it will just ignore that key. This is *much* cheaper than
-    // actually iterating through the entire database instead (which
-    // was, for posterity, the original implementation). In addition,
-    // caching the "first" position we know is in the database is
-    // cheaper than using an iterator to determine the first position
-    // (which was, for posterity, the second implementation).
-
-    leveldb::WriteBatch batch;
-
-    // Add positions up to (but excluding) the truncate position to
-    // the batch starting at the first position still in leveldb.
-    uint64_t index = 0;
-    while ((first + index) < action.truncate().to()) {
-      batch.Delete(encode(first + index));
-      index++;
-    }
-
-    // If we added any positions, attempt to delete them!
-    if (index > 0) {
-      // We do this write asynchronously (e.g., using default options).
-      leveldb::Status status = db->Write(leveldb::WriteOptions(), &batch);
-
-      if (!status.ok()) {
-        LOG(WARNING) << "Ignoring leveldb batch delete failure: "
-                     << status.ToString();
-      } else {
-        first = action.truncate().to(); // Save the new first position!
-
-        LOG(INFO) << "Deleting ~" << index
-                  << " keys from leveldb took " << stopwatch.elapsed();
-      }
-    }
-  }
-
-  return Nothing();
-}
-
-
-Try<Action> LevelDBStorage::read(uint64_t position)
-{
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  leveldb::ReadOptions options;
-
-  string value;
-
-  leveldb::Status status = db->Get(options, encode(position), &value);
-
-  if (!status.ok()) {
-    return Error(status.ToString());
-  }
-
-  google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
-
-  Record record;
-
-  if (!record.ParseFromZeroCopyStream(&stream)) {
-    return Error("Failed to deserialize record");
-  }
-
-  if (record.type() != Record::ACTION) {
-    return Error("Bad record");
-  }
-
-  LOG(INFO) << "Reading position from leveldb took " << stopwatch.elapsed();
-
-  return record.action();
-}
-
-
 class ReplicaProcess : public ProtobufProcess<ReplicaProcess>
 {
 public:
@@ -1140,7 +711,7 @@ bool ReplicaProcess::persist(const Action& action)
 
 void ReplicaProcess::restore(const string& path)
 {
-  Try<State> state = storage->restore(path);
+  Try<Storage::State> state = storage->restore(path);
 
   CHECK_SOME(state) << "Failed to recover the log";
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/replica.hpp
----------------------------------------------------------------------
diff --git a/src/log/replica.hpp b/src/log/replica.hpp
index ecb126d..467d0d9 100644
--- a/src/log/replica.hpp
+++ b/src/log/replica.hpp
@@ -19,14 +19,16 @@
 #ifndef __LOG_REPLICA_HPP__
 #define __LOG_REPLICA_HPP__
 
+#include <stdint.h>
+
 #include <list>
 #include <set>
 #include <string>
 
+#include <process/future.hpp>
+#include <process/pid.hpp>
 #include <process/protobuf.hpp>
 
-#include <stout/result.hpp>
-
 #include "messages/log.hpp"
 
 namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/storage.hpp
----------------------------------------------------------------------
diff --git a/src/log/storage.hpp b/src/log/storage.hpp
new file mode 100644
index 0000000..663146f
--- /dev/null
+++ b/src/log/storage.hpp
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_STORAGE_HPP__
+#define __LOG_STORAGE_HPP__
+
+#include <stdint.h>
+
+#include <set>
+#include <string>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "messages/log.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Abstract interface for reading and writing records.
+class Storage
+{
+public:
+  struct State
+  {
+    Metadata metadata; // The metadata for the replica.
+    uint64_t begin; // Beginning position of the log.
+    uint64_t end; // Ending position of the log.
+    std::set<uint64_t> learned; // Positions present and learned
+    std::set<uint64_t> unlearned; // Positions present but unlearned.
+  };
+
+  virtual ~Storage() {}
+
+  virtual Try<State> restore(const std::string& path) = 0;
+  virtual Try<Nothing> persist(const Metadata& metadata) = 0;
+  virtual Try<Nothing> persist(const Action& action) = 0;
+  virtual Try<Action> read(uint64_t position) = 0;
+};
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_STORAGE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool.hpp
----------------------------------------------------------------------
diff --git a/src/log/tool.hpp b/src/log/tool.hpp
new file mode 100644
index 0000000..656d3f6
--- /dev/null
+++ b/src/log/tool.hpp
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_TOOL_HPP__
+#define __LOG_TOOL_HPP__
+
+#include <string>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+// Represents a tool for processing a log file.
+class Tool
+{
+public:
+  virtual ~Tool() {}
+
+  virtual std::string name() const = 0;
+
+  // Executes the tool. The tool can be configured by passing in
+  // command line arguments. If command line arguments are not
+  // specified, the default configuration will be used.
+  virtual Try<Nothing> execute(int argc, char** argv) = 0;
+};
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_TOOL_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/initialize.cpp
----------------------------------------------------------------------
diff --git a/src/log/tool/initialize.cpp b/src/log/tool/initialize.cpp
new file mode 100644
index 0000000..ccda7fb
--- /dev/null
+++ b/src/log/tool/initialize.cpp
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <sstream>
+
+#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/error.hpp>
+
+#include "log/replica.hpp"
+#include "log/tool/initialize.hpp"
+
+#include "logging/logging.hpp"
+
+using namespace process;
+
+using std::endl;
+using std::ostringstream;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+Initialize::Flags::Flags()
+{
+  add(&Flags::path,
+      "path",
+      "Path to the log");
+
+  add(&Flags::timeout,
+      "timeout",
+      "Maximum time allowed for the command to finish\n"
+      "(e.g., 500ms, 1sec, etc.)");
+
+  add(&Flags::help,
+      "help",
+      "Prints the help message",
+      false);
+}
+
+
+string Initialize::usage(const string& argv0) const
+{
+  ostringstream out;
+
+  out << "Usage: " << argv0 << " " << name() << " [OPTIONS]" << endl
+      << endl
+      << "This command is used to initialize the log" << endl
+      << endl
+      << "Supported OPTIONS:" << endl
+      << flags.usage();
+
+  return out.str();
+}
+
+
+Try<Nothing> Initialize::execute(int argc, char** argv)
+{
+  // Configure the tool by parsing command line arguments.
+  if (argc > 0 && argv != NULL) {
+    Try<Nothing> load = flags.load(None(), argc, argv);
+    if (load.isError()) {
+      return Error(load.error() + "\n\n" + usage(argv[0]));
+    }
+
+    if (flags.help) {
+      return Error(usage(argv[0]));
+    }
+
+    process::initialize();
+    logging::initialize(argv[0], flags);
+  }
+
+  if (flags.path.isNone()) {
+    return Error("Missing flag: '--path'");
+  }
+
+  // Setup the timeout if specified.
+  Option<Timeout> timeout = None();
+  if (flags.timeout.isSome()) {
+    timeout = Timeout::in(flags.timeout.get());
+  }
+
+  Replica replica(flags.path.get());
+
+  // Get the current status of the replica.
+  Future<Metadata::Status> status = replica.status();
+  if (timeout.isSome()) {
+    status.await(timeout.get().remaining());
+  } else {
+    status.await();
+  }
+
+  if (status.isPending()) {
+    return Error("Timed out while getting replica status");
+  } else if (status.isDiscarded()) {
+    return Error("Failed to get status of replica (discarded future)");
+  } else if (status.isFailed()) {
+    return Error(status.failure());
+  }
+
+  // We only initialize a log if it is empty.
+  if (status.get() != Metadata::EMPTY) {
+    return Error("The log is not empty");
+  }
+
+  // Update the status of the replica to VOTING.
+  Future<bool> update = replica.update(Metadata::VOTING);
+  if (timeout.isSome()) {
+    update.await(timeout.get().remaining());
+  } else {
+    update.await();
+  }
+
+  if (update.isPending()) {
+    return Error("Timed out while setting replica status");
+  } else if (update.isDiscarded()) {
+    return Error("Failed to set replica status (discarded future)");
+  } else if (update.isFailed()) {
+    return Error(update.failure());
+  }
+
+  return Nothing();
+}
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/initialize.hpp
----------------------------------------------------------------------
diff --git a/src/log/tool/initialize.hpp b/src/log/tool/initialize.hpp
new file mode 100644
index 0000000..10ac269
--- /dev/null
+++ b/src/log/tool/initialize.hpp
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_TOOL_INITIALIZE_HPP__
+#define __LOG_TOOL_INITIALIZE_HPP__
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+
+#include "log/tool.hpp"
+
+#include "logging/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+class Initialize : public Tool
+{
+public:
+  class Flags : public logging::Flags
+  {
+  public:
+    Flags();
+
+    Option<std::string> path;
+    Option<Duration> timeout;
+    bool help;
+  };
+
+  virtual std::string name() const { return "initialize"; }
+  virtual Try<Nothing> execute(int argc = 0, char** argv = NULL);
+
+  // Users can change the default configuration by setting this flags.
+  Flags flags;
+
+private:
+  std::string usage(const std::string& argv0) const;
+};
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_TOOL_INITIALIZE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/read.cpp
----------------------------------------------------------------------
diff --git a/src/log/tool/read.cpp b/src/log/tool/read.cpp
new file mode 100644
index 0000000..ab6068d
--- /dev/null
+++ b/src/log/tool/read.cpp
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <sstream>
+
+#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/error.hpp>
+
+#include "log/replica.hpp"
+#include "log/tool/read.hpp"
+
+#include "logging/logging.hpp"
+
+using namespace process;
+
+using std::cout;
+using std::endl;
+using std::list;
+using std::ostringstream;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+Read::Flags::Flags()
+{
+  add(&Flags::path,
+      "path",
+      "Path to the log");
+
+  add(&Flags::from,
+      "from",
+      "Position from which to start reading the log");
+
+  add(&Flags::to,
+      "to",
+      "Position from which to stop reading the log");
+
+  add(&Flags::timeout,
+      "timeout",
+      "Maximum time allowed for the command to finish\n"
+      "(e.g., 500ms, 1sec, etc.)");
+
+  add(&Flags::help,
+      "help",
+      "Prints the help message",
+      false);
+}
+
+
+string Read::usage(const string& argv0) const
+{
+  ostringstream out;
+
+  out << "Usage: " << argv0 << " " << name() << " [OPTIONS]" << endl
+      << endl
+      << "This command is used to read the log" << endl
+      << endl
+      << "Supported OPTIONS:" << endl
+      << flags.usage();
+
+  return out.str();
+}
+
+
+Try<Nothing> Read::execute(int argc, char** argv)
+{
+  // Configure the tool by parsing command line arguments.
+  if (argc > 0 && argv != NULL) {
+    Try<Nothing> load = flags.load(None(), argc, argv);
+    if (load.isError()) {
+      return Error(load.error() + "\n\n" + usage(argv[0]));
+    }
+
+    if (flags.help) {
+      return Error(usage(argv[0]));
+    }
+
+    process::initialize();
+    logging::initialize(argv[0], flags);
+  }
+
+  if (flags.path.isNone()) {
+    return Error("Missing flag '--path'");
+  }
+
+  // Setup the timeout if specified.
+  Option<Timeout> timeout = None();
+  if (flags.timeout.isSome()) {
+    timeout = Timeout::in(flags.timeout.get());
+  }
+
+  Replica replica(flags.path.get());
+
+  // Get the beginning of the replica.
+  Future<uint64_t> begin = replica.beginning();
+  if (timeout.isSome()) {
+    begin.await(timeout.get().remaining());
+  } else {
+    begin.await();
+  }
+
+  if (begin.isPending()) {
+    return Error("Timed out while getting the beginning of the replica");
+  } else if (begin.isDiscarded()) {
+    return Error(
+        "Failed to get the beginning of the replica (discarded future)");
+  } else if (begin.isFailed()) {
+    return Error(begin.failure());
+  }
+
+  // Get the ending of the replica.
+  Future<uint64_t> end = replica.ending();
+  if (timeout.isSome()) {
+    end.await(timeout.get().remaining());
+  } else {
+    end.await();
+  }
+
+  if (end.isPending()) {
+    return Error("Timed out while getting the ending of the replica");
+  } else if (end.isDiscarded()) {
+    return Error(
+        "Failed to get the ending of the replica (discarded future)");
+  } else if (end.isFailed()) {
+    return Error(end.failure());
+  }
+
+  Option<uint64_t> from = flags.from;
+  if (from.isNone()) {
+    from = begin.get();
+  }
+
+  Option<uint64_t> to = flags.to;
+  if (to.isNone()) {
+    to = end.get();
+  }
+
+  LOG(INFO) << "Attempting to read the log from "
+            << from.get() << " to " << to.get() << endl;
+
+  Future<list<Action> > actions = replica.read(from.get(), to.get());
+  if (timeout.isSome()) {
+    actions.await(timeout.get().remaining());
+  } else {
+    actions.await();
+  }
+
+  if (actions.isPending()) {
+    return Error("Timed out while reading the replica");
+  } else if (actions.isDiscarded()) {
+    return Error("Failed to read the replica (discarded future)");
+  } else if (actions.isFailed()) {
+    return Error(actions.failure());
+  }
+
+  foreach (const Action& action, actions.get()) {
+    cout << "----------------------------------------------" << endl;
+    action.PrintDebugString();
+  }
+
+  return Nothing();
+}
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2fe5860/src/log/tool/read.hpp
----------------------------------------------------------------------
diff --git a/src/log/tool/read.hpp b/src/log/tool/read.hpp
new file mode 100644
index 0000000..74faec0
--- /dev/null
+++ b/src/log/tool/read.hpp
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_TOOL_READ_HPP__
+#define __LOG_TOOL_READ_HPP__
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+
+#include "log/tool.hpp"
+
+#include "logging/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+namespace tool {
+
+class Read : public Tool
+{
+public:
+  class Flags : public logging::Flags
+  {
+  public:
+    Flags();
+
+    Option<std::string> path;
+    Option<uint64_t> from;
+    Option<uint64_t> to;
+    Option<Duration> timeout;
+    bool help;
+  };
+
+  virtual std::string name() const { return "read"; }
+  virtual Try<Nothing> execute(int argc = 0, char** argv = NULL);
+
+  // Users can change the default configuration by setting this flags.
+  Flags flags;
+
+private:
+  std::string usage(const std::string& argv0) const;
+};
+
+} // namespace tool {
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_TOOL_READ_HPP__