You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:11:29 UTC
[43/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a
Refactor mem3_rpc:add_checkpoint/2
This is based on Adam Kocoloski's original add_checkpoint/2 but uses a
body recursive function to avoid the final reverse/filter steps.
BugzId: 21973
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e64dd028
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e64dd028
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e64dd028
Branch: refs/heads/windsor-merge
Commit: e64dd0281f1d9b9b22511b3625c1fb1f97a42042
Parents: e147621
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Dec 9 14:04:39 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:10 2014 +0100
----------------------------------------------------------------------
src/mem3_rpc.erl | 133 +++++++++++++++++++++++++++-----------------------
1 file changed, 71 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e64dd028/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 8d8c832..10294a7 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -125,68 +125,11 @@ add_checkpoint({Props}, {History}) ->
% any larger update seq than we're currently recording.
FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
- % Insert the new entry into the history and trim the history
- % to keep an exponentially increasing delta between checkpoints.
- % We do this by defining logical buckets of exponentially
- % increasing size and then keep the smallest and largest values
- % in each bucket. We keep both min and max points so that
- % we don't end up with empty buckets as new points are added.
- %
- % NB: We're guaranteed to keep the newest entry passed to this
- % function because we filter out all larger update sequences
- % which means it is guaranteed to be the smallest value in the
- % first bucket with a delta of 0.
- WithNewEntry = [{Props} | FilteredHistory],
-
- % Tag each entry with the bucket id
- BucketTagged = lists:map(fun({Entry}) ->
- EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
- BucketTag = case SourceSeq - EntrySourceSeq of
- 0 ->
- 0;
- N when N > 0 ->
- % This is int(log2(SourceSeq - EntrySourceSeq))
- trunc(math:log(N) / math:log(2))
- end,
- {BucketTag, SourceSeq - EntrySourceSeq, {Entry}}
- end, WithNewEntry),
-
- % Find the min/max entries for each bucket
- Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) ->
- {MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of
- {ok, Value} -> Value;
- error -> {nil, nil}
- end,
- NewMin = case MinEntry of
- {MinDelta, _} when Delta < MinDelta ->
- {Delta, Entry};
- nil ->
- {Delta, Entry};
- _ ->
- MinEntry
- end,
- NewMax = case MaxEntry of
- {MaxDelta, _} when Delta > MaxDelta ->
- {Delta, Entry};
- nil ->
- {Delta, Entry};
- _ ->
- MaxEntry
- end,
- dict:store(Bucket, {NewMin, NewMax}, BucketAcc)
- end, dict:new(), BucketTagged),
-
- % Turn our bucket dict back into a list sorted by increasing
- % deltas (which corresponds to decreasing source_seq values).
- NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) ->
- % If there's a single point in a bucket its both the min
- % and max entry so we account for that here.
- if Min == Max ->
- [element(2, Min)];
- true ->
- [element(2, Min), element(2, Max)]
- end
- end, lists:sort(dict:to_list(Buckets))),
+ % Re-bucket our history based on the most recent source
+ % sequence. This is where we drop old checkpoints to
+ % maintain the exponential distribution.
+ {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0),
+ NewSourceHistory = [{Props} | RebucketedHistory],
% Finally update the source node history and we're done.
NodeRemoved = lists:keydelete(SourceNode, 1, History),
@@ -206,6 +149,72 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
lists:filter(TargetFilter, SourceFiltered).
+%% @doc This function adjusts our history to maintain a
+%% history of checkpoints that follow an exponentially
+%% increasing age from the most recent checkpoint.
+%%
+%% The terms newest and oldest used in these comments
+%% refers to the (NewSeq - CurSeq) difference where smaller
+%% values are considered newer.
+%%
+%% It works by assigning each entry to a bucket and keeping
+%% the newest and oldest entry in each bucket. Keeping
+%% both the newest and oldest means that we won't end up
+%% with empty buckets as checkpoints are promoted to new
+%% buckets.
+%%
+%% The return value of this function is a two-tuple of the
+%% form `{BucketId, History}` where BucketId is the id of
+%% the bucket for the first entry in History. This is used
+%% when recursing to detect the oldest value in a given
+%% bucket.
+%%
+%% This function expects the provided history to be sorted
+%% in descending order of source_seq values.
+rebucket([], _NewSeq, Bucket) ->
+ {Bucket+1, []};
+rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
+ CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
+ case find_bucket(NewSeq, CurSeq, Bucket) of
+ Bucket ->
+ % This entry is in an existing bucket which means
+ % we will only keep it if its the oldest value
+ % in the bucket. To detect this we rebucket the
+ % rest of the list and only include Entry if the
+ % rest of the list is in a bigger bucket.
+ case rebucket(RestHistory, NewSeq, Bucket) of
+ {Bucket, NewHistory} ->
+ % There's another entry in this bucket so we drop the
+ % current entry.
+ {Bucket, NewHistory};
+ {NextBucket, NewHistory} when NextBucket > Bucket ->
+ % The rest of the history was rebucketed into a larger
+ % bucket so this is the oldest entry in the current
+ % bucket.
+ {Bucket, [{Entry} | NewHistory]}
+ end;
+ NextBucket when NextBucket > Bucket ->
+ % This entry is the newest in NextBucket so we add it
+ % to our history and continue rebucketing.
+ {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket),
+ {NextBucket, [{Entry} | NewHistory]}
+ end.
+
+
+%% @doc Find the bucket id for the given sequence pair.
+find_bucket(NewSeq, CurSeq, Bucket) ->
+ % The +1 constant in this comparison is a bit subtle. The
+ % reason for it is to make sure that the first entry in
+ % the history is guaranteed to have a BucketId of 1. This
+ % also relies on never having a duplicated update
+ % sequence so adding 1 here guarantees a difference >= 2.
+ if (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
+ find_bucket(NewSeq, CurSeq, Bucket+1);
+ true ->
+ Bucket
+ end.
+
+
rexi_call(Node, MFA) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),