You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/22 20:39:36 UTC

[couchdb] branch COUCHDB-3326-clustered-purge-davisp-refactor updated: WIP - Updating read repair for a myriad edge cases

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/COUCHDB-3326-clustered-purge-davisp-refactor by this push:
     new 05d9a3c  WIP - Updating read repair for a myriad edge cases
05d9a3c is described below

commit 05d9a3c2ada452690f8be0f4a7745ddf53c710ad
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Mar 22 15:39:22 2018 -0500

    WIP - Updating read repair for a myriad edge cases
---
 src/fabric/src/fabric_rpc.erl | 182 ++++++++++++++++++++++++++----------------
 1 file changed, 112 insertions(+), 70 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 2c4d5f4..91fbb9e 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -244,15 +244,16 @@ update_docs(DbName, Docs0, Options) ->
         true -> replicated_changes;
         _ -> interactive_edit
     end,
-    DocsByNode = couch_util:get_value(read_repair, Options),
-    case {X, DocsByNode} of
-        {_, undefined} ->
-            Docs = make_att_readers(Docs0),
-            with_db(DbName, Options,
-                {couch_db, update_docs, [Docs, Options, X]});
-        {replicated_changes, _} ->
-            update_docs_read_repair(DbName, DocsByNode, Options)
-    end.
+    NodeIdRevs = couch_util:get_value(read_repair, Options),
+    Docs1 = case {X, is_list(NodeIdRevs)} of
+        {replicated_changes, true} ->
+            read_repair_filter(DbName, NodeIdRevs, Docs0, Options);
+        _ ->
+            Docs0
+    end,
+    Docs2 = make_att_readers(Docs1),
+    with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, X]}).
+
 
 get_purge_seq(DbName, Options) ->
     with_db(DbName, Options, {couch_db, get_purge_seq, []}).
@@ -320,77 +321,118 @@ with_db(DbName, Options, {M,F,A}) ->
     end.
 
 
-update_docs_read_repair(DbName, DocsByNode, Options) ->
+read_repair_filter_docs(DbName, NodeIdRevs, Docs, Options) ->
     set_io_priority(DbName, Options),
     case get_or_create_db(DbName, Options) of
-    {ok, Db} ->
-        % omit Revisions that have been purged
-        Docs = filter_purged_revs(Db, DocsByNode),
-        Docs2 = make_att_readers(Docs),
-        {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]},
-        rexi:reply(try
-            apply(M, F, [Db | A])
-        catch Exception ->
-            Exception;
-        error:Reason ->
-            couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
-                clean_stack()]),
-            {error, Reason}
-        end);
-    Error ->
-        rexi:reply(Error)
+        {ok, Db} ->
+            try
+                filter_purged_revs(Db, DocsByNode, Docs)
+            after
+                couch_db:close(Db)
+            end;
+        Error ->
+            rexi:reply(Error)
     end.
 
 
-% given [{Node, Doc}] diff revs of the same DocID from diff nodes
-% returns [Doc] filtering out purged docs.
-% This is done for read-repair from fabric_doc_open or fabric_doc_open_revs,
-% so that not to recreate Docs that have been purged before
-% on this node() from Nodes that are out of sync.
-filter_purged_revs(Db, DocsByNode) ->
-    % go through _local/purge-mem3-.. docs
-    % and assemble NodePSeqs = [{Node1, NodePSeq1}, ...]
-    % NodePSeq1 - purge_seq of this node known to Node1
-    V = "v" ++ config:get("purge", "version", "1") ++ "-",
-    StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++  V ++ "mem3-"),
-    EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
-    Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
-    LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
-        {VOps} = couch_util:get_value(<<"verify_options">>, Props),
-        Node = couch_util:get_value(<<"node">>, VOps),
-        NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
-        {ok, [{Node, NodePSeq} | Acc]}
+% A read repair operation may have been triggered by a node
+% that was out of sync with the local node. Thus, any time
+% we receive a read repair request we need to check if we
+% may have recently purged any of the given revisions and
+% ignore them if so.
+%
+% This is accomplished by looking at the purge infos that we
+% have locally that have not been replicated to the remote
+% node. The logic here is that we may have received the purge
+% request before the remote shard copy. So to check that we
+% need to look at the purge infos that we have locally but
+% have not yet sent to the remote copy.
+%
+% NodeIdRevs are the list of {node(), {docid(), [rev()]}}
+% tuples passed as the read_repair option to update_docs.
+filter_purged_revs(Db, NodeIdRevs, Docs) ->
+    Nodes = lists:usort([Node || {Node, _IdRevs} <- NodeIdRevs]),
+
+    % Gather the list of {Node, PurgeSeq} pairs for all nodes
+    % that are present in our read repair group
+    StartKey = <<?LOCAL_DOC_PREFIX, "/purge-mem3-">>,
+    Opts = [{start_key, StartKey}],
+    FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+        case Id of
+            <<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> ->
+                TargetNodeBin = couch_util:get_value(<<"target_node">>, Props),
+                PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props)
+                NewAcc = try
+                    TargetNode = binary_to_existing_atom(TargetNodeBin, latin1),
+                    case lists:member(TargetNode, Nodes) of
+                        true ->
+                            {ok, [{TargetNode, PurgeSeq} | Acc]};
+                        false ->
+                            {ok, Acc}
+                    end
+                catch error:badarg ->
+                    % A really old doc referring to a node that's
+                    % no longer in the cluster
+                    {ok, Acc}
+                end
+            _ ->
+                % We've processed all _local mem3 purge docs
+                {stop, Acc}
+        end
     end,
-    {ok, NodePSeqs} =
-        couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+    {ok, NodeSeqs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
 
-    % go through all doc_updates and
-    % filter out updates from nodes that are behind in purges synchronization
-    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
     {ok, DbPSeq} = couch_db:get_purge_seq(Db),
-    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
-    lists:foldl(fun({Node, Doc}, Docs) ->
-        NodePSeq = case lists:keyfind(Node, 1, NodePSeqs) of
-           {Node, NodePSeq0} -> NodePSeq0;
-           false -> 0
+    Lag = config:get_integer("couchdb", "read_repair_lag", 100),
+
+    {TotesGood, NeedChecking} =
+            lists:foldl(fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
+        NodeSeq = case lists:keyfind(Node, 1, NodeSeqs) of
+            {Node, PS} -> PS;
+            false -> 0
         end,
-        if  NodePSeq == DbPSeq ->
-            [Doc|Docs];
-        (NodePSeq+AllowedPSeqLag) < DbPSeq ->
-            % Node is very out of sync, ignore updates from it
-            Docs;
-        true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
-            % if Doc has been purged recently -> ignore it
-            {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
-                    NodePSeq, PurgeFoldFun, [], []),
-            {Start, [FirstRevId|_]} = Doc#doc.revs,
-            DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
-            case lists:member(DocIdRevs, PurgedIdsRevs) of
-                true -> Docs;
-                false -> [Doc|Docs]
-            end
+        case NodeSeq of
+            DbPSeq ->
+                {DocId, Revs} = IdRevs,
+                NewGTG = [{DocId, Rev} || Rev <- Revs] ++ GoodToGo,
+                {NewGTG, MaybeGood};
+            _ when NodeSeq >= DbPSeq - Lag ->
+                {GoodToGo, [{NodeSeq, IdRevs} | MaybGood]};
+            _ ->
+                % The remote node `Node` is so far out of date
+                % we'll just ignore its read-repair updates rather
+                % than scan an unbounded number of purge infos
+                {GoodToGo, MaybeGood}
         end
-    end, [], DocsByNode).
+    end, {[], []}, NodeIdRevs),
+
+    % For any node that's not up to date with internal
+    % replication we have to check if any of the revisions
+    % have been purged before running our updates
+    RestGood = if NeedChecking == [] -> []; true ->
+        StartSeq = lists:min([S || {S, _, _} <- ExpandedChecks]),
+        CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) ->
+            FilterFun = fun({NS, FiltDocId, FiltRev}) ->
+                NS =< PSeq andalso FiltDocId == DocId
+                        andalso lists:member(FiltRev, Revs)
+            end,
+            {ok, lists:filter(FilterFun, Acc)}
+        end,
+        InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) ->
+            [{NodeSeq, DocId, Rev} || Rev <- Revs]
+        end, NeedChecking),
+        {ok, Result} =
+                couch_db:fold_purge_infos(Db, StartSeq, CheckFoldFun, InitAcc),
+        [{DocId, Rev} || {_NSeq, DocId, Rev} <- Result]
+    end,
+
+    % Finally, only return docs that have a revision that
+    % has not been filtered out of the initial set
+    AllGood = lists:usort(TotesGood ++ RestGood),
+    DocFiltFun = fun(#doc{id = Id, revs = {Pos, [Rev | _]}}) ->
+        lists:member({Id, {Pos, Rev}}, AllGood)
+    end,
+    lists:filter(DocFiltFun, Docs).
 
 
 get_or_create_db(DbName, Options) ->

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.