You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:23:02 UTC

[42/50] mem3 commit: updated refs/heads/master to 64c0c74

Refactor mem3_rpc:add_checkpoint/2

This is based on Adam Kocoloski's original add_checkpoint/2 but uses a
body recursive function to avoid the final reverse/filter steps.

BugzId: 21973


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e64dd028
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e64dd028
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e64dd028

Branch: refs/heads/master
Commit: e64dd0281f1d9b9b22511b3625c1fb1f97a42042
Parents: e147621
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Dec 9 14:04:39 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:10 2014 +0100

----------------------------------------------------------------------
 src/mem3_rpc.erl | 133 +++++++++++++++++++++++++++-----------------------
 1 file changed, 71 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e64dd028/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 8d8c832..10294a7 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -125,68 +125,11 @@ add_checkpoint({Props}, {History}) ->
     % any larger update seq than we're currently recording.
     FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
 
-    % Insert the new entry into the history and trim the history
-    % to keep an exponentially increasing delta between checkpoints.
-    % We do this by defining logical buckets of exponentially
-    % increasing size and then keep the smallest and largest values
-    % in each bucket. We keep both min and max points so that
-    % we don't end up with empty buckets as new points are added.
-    %
-    % NB: We're guaranteed to keep the newest entry passed to this
-    % function because we filter out all larger update sequences
-    % which means it is guaranteed to be the smallest value in the
-    % first bucket with a delta of 0.
-    WithNewEntry = [{Props} | FilteredHistory],
-
-    % Tag each entry with the bucket id
-    BucketTagged = lists:map(fun({Entry}) ->
-        EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
-        BucketTag = case SourceSeq - EntrySourceSeq of
-            0 ->
-                0;
-            N when N > 0 ->
-                % This is int(log2(SourceSeq - EntrySourceSeq))
-                trunc(math:log(N) / math:log(2))
-        end,
-        {BucketTag, SourceSeq - EntrySourceSeq, {Entry}}
-    end, WithNewEntry),
-
-    % Find the min/max entries for each bucket
-    Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) ->
-        {MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of
-            {ok, Value} -> Value;
-            error -> {nil, nil}
-        end,
-        NewMin = case MinEntry of
-            {MinDelta, _} when Delta < MinDelta ->
-                {Delta, Entry};
-            nil ->
-                {Delta, Entry};
-            _ ->
-                MinEntry
-        end,
-        NewMax = case MaxEntry of
-            {MaxDelta, _} when Delta > MaxDelta ->
-                {Delta, Entry};
-            nil ->
-                {Delta, Entry};
-            _ ->
-                MaxEntry
-        end,
-        dict:store(Bucket, {NewMin, NewMax}, BucketAcc)
-    end, dict:new(), BucketTagged),
-
-    % Turn our bucket dict back into a list sorted by increasing
-    % deltas (which corresponds to decreasing source_seq values).
-    NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) ->
-        % If there's a single point in a bucket its both the min
-        % and max entry so we account for that here.
-        if Min == Max ->
-            [element(2, Min)];
-        true ->
-            [element(2, Min), element(2, Max)]
-        end
-    end, lists:sort(dict:to_list(Buckets))),
+    % Re-bucket our history based on the most recent source
+    % sequence. This is where we drop old checkpoints to
+    % maintain the exponential distribution.
+    {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0),
+    NewSourceHistory = [{Props} | RebucketedHistory],
 
     % Finally update the source node history and we're done.
     NodeRemoved = lists:keydelete(SourceNode, 1, History),
@@ -206,6 +149,72 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
     lists:filter(TargetFilter, SourceFiltered).
 
 
+%% @doc This function adjusts our history to maintain a
+%% history of checkpoints that follow an exponentially
+%% increasing age from the most recent checkpoint.
+%%
+%% The terms newest and oldest used in these comments
+%% refers to the (NewSeq - CurSeq) difference where smaller
+%% values are considered newer.
+%%
+%% It works by assigning each entry to a bucket and keeping
+%% the newest and oldest entry in each bucket. Keeping
+%% both the newest and oldest means that we won't end up
+%% with empty buckets as checkpoints are promoted to new
+%% buckets.
+%%
+%% The return value of this function is a two-tuple of the
+%% form `{BucketId, History}` where BucketId is the id of
+%% the bucket for the first entry in History. This is used
+%% when recursing to detect the oldest value in a given
+%% bucket.
+%%
+%% This function expects the provided history to be sorted
+%% in descending order of source_seq values.
+rebucket([], _NewSeq, Bucket) ->
+    {Bucket+1, []};
+rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
+    CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
+    case find_bucket(NewSeq, CurSeq, Bucket) of
+        Bucket ->
+            % This entry is in an existing bucket which means
+            % we will only keep it if its the oldest value
+            % in the bucket. To detect this we rebucket the
+            % rest of the list and only include Entry if the
+            % rest of the list is in a bigger bucket.
+            case rebucket(RestHistory, NewSeq, Bucket) of
+                {Bucket, NewHistory} ->
+                    % There's another entry in this bucket so we drop the
+                    % current entry.
+                    {Bucket, NewHistory};
+                {NextBucket, NewHistory} when NextBucket > Bucket ->
+                    % The rest of the history was rebucketed into a larger
+                    % bucket so this is the oldest entry in the current
+                    % bucket.
+                    {Bucket, [{Entry} | NewHistory]}
+            end;
+        NextBucket when NextBucket > Bucket ->
+            % This entry is the newest in NextBucket so we add it
+            % to our history and continue rebucketing.
+            {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket),
+            {NextBucket, [{Entry} | NewHistory]}
+    end.
+
+
+%% @doc Find the bucket id for the given sequence pair.
+find_bucket(NewSeq, CurSeq, Bucket) ->
+    % The +1 constant in this comparison is a bit subtle. The
+    % reason for it is to make sure that the first entry in
+    % the history is guaranteed to have a BucketId of 1. This
+    % also relies on never having a duplicated update
+    % sequence so adding 1 here guarantees a difference >= 2.
+    if (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
+        find_bucket(NewSeq, CurSeq, Bucket+1);
+    true ->
+        Bucket
+    end.
+
+
 rexi_call(Node, MFA) ->
     Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
     Ref = rexi:cast(Node, self(), MFA, [sync]),