You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/10/29 09:29:31 UTC
git commit: Fixed bug with snapshot positions when writing log diffs.
Repository: mesos
Updated Branches:
refs/heads/master b8ada87e8 -> 29e4afdd9
Fixed bug with snapshot positions when writing log diffs.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29e4afdd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29e4afdd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29e4afdd
Branch: refs/heads/master
Commit: 29e4afdd922994ff21b91896949311c5a3a948dc
Parents: b8ada87
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Oct 29 01:27:13 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Oct 29 01:27:16 2014 -0700
----------------------------------------------------------------------
src/state/log.cpp | 33 +++++++++++++++++++--------------
src/tests/state_tests.cpp | 9 +++++++++
2 files changed, 28 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/29e4afdd/src/state/log.cpp
----------------------------------------------------------------------
diff --git a/src/state/log.cpp b/src/state/log.cpp
index 9033ce4..c0c81d5 100644
--- a/src/state/log.cpp
+++ b/src/state/log.cpp
@@ -98,7 +98,7 @@ private:
Future<bool> ___set(
const state::Entry& entry,
size_t diff,
- const Option<Log::Position>& position);
+ Option<Log::Position> position);
Future<bool> _expunge(const state::Entry& entry);
Future<bool> __expunge(const state::Entry& entry);
@@ -138,9 +138,8 @@ private:
entry(entry),
diffs(diffs) {}
- Try<Snapshot> patch(
- const Log::Position& position,
- const Operation::Diff& diff) const
+ // Returns a snapshot after having applied the specified diff.
+ Try<Snapshot> patch(const Operation::Diff& diff) const
{
if (diff.entry().name() != entry.name()) {
return Error("Attempted to patch the wrong snapshot");
@@ -310,8 +309,7 @@ Future<Nothing> LogStorageProcess::apply(const list<Log::Entry>& entries)
CHECK_SOME(snapshot);
- Try<Snapshot> patched =
- snapshot.get().patch(entry.position, operation.diff());
+ Try<Snapshot> patched = snapshot.get().patch(operation.diff());
if (patched.isError()) {
return Failure("Failed to apply the diff: " + patched.error());
@@ -520,24 +518,31 @@ Future<bool> LogStorageProcess::__set(
Future<bool> LogStorageProcess::___set(
const state::Entry& entry,
size_t diffs,
- const Option<Log::Position>& position)
+ Option<Log::Position> position)
{
if (position.isNone()) {
starting = None(); // Reset 'starting' so we try again.
return false;
}
- // Add (or update) the snapshot for this entry and truncate
- // the log if possible.
- CHECK(!snapshots.contains(entry.name()) ||
- snapshots.get(entry.name()).get().position < position.get());
+ // Update index so we don't bother reading anything before this
+ // position again (if we don't have to).
+ index = max(index, position);
+
+ // Determine the position that represents the snapshot: if we just
+ // wrote a diff then we want to use the existing position of the
+ // snapshot, otherwise we just overwrote the snapshot so we should
+ // use the returned position (i.e., do nothing).
+ if (diffs > 0) {
+ CHECK(snapshots.contains(entry.name()));
+ position = snapshots.get(entry.name()).get().position;
+ }
Snapshot snapshot(position.get(), entry, diffs);
snapshots.put(snapshot.entry.name(), snapshot);
- truncate();
- // Update index so we don't bother with this position again.
- index = max(index, position);
+ // And truncate the log if necessary.
+ truncate();
return true;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/29e4afdd/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index f37d606..ef2e96f 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -643,6 +643,15 @@ TEST_F(LogStateTest, Diff)
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
+ // It's possible that we're doing truncation asynchronously which
+ // will cause the test to fail because we'll end up getting a
+ // pending position from Log::Reader::ending which will cause
+ // Log::Reader::read to fail. To remedy this, we pause the clock and
+ // wait for all executing processe to settle.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
Log::Reader reader(log);
Future<Log::Position> beginning = reader.beginning();