You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/26 20:55:49 UTC

[couchdb] 18/33: WIP - read repair - fabric_rpc.erl

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 922fce71f90ced2b7f9616cf26628bb541421597
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Mon Mar 26 13:07:04 2018 -0500

    WIP - read repair - fabric_rpc.erl
---
 src/fabric/src/fabric_rpc.erl | 23 ++++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index d1d33e9..d493da7 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -321,12 +321,12 @@ with_db(DbName, Options, {M,F,A}) ->
     end.
 
 
-read_repair_filter_docs(DbName, NodeIdRevs, Docs, Options) ->
+read_repair_filter(DbName, NodeIdRevs, Docs, Options) ->
     set_io_priority(DbName, Options),
     case get_or_create_db(DbName, Options) of
         {ok, Db} ->
             try
-                filter_purged_revs(Db, DocsByNode, Docs)
+                filter_purged_revs(Db, NodeIdRevs, Docs)
             after
                 couch_db:close(Db)
             end;
@@ -362,7 +362,7 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
         case Id of
             <<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> ->
                 TargetNodeBin = couch_util:get_value(<<"target_node">>, Props),
-                PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props)
+                PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
                 NewAcc = try
                     TargetNode = binary_to_existing_atom(TargetNodeBin, latin1),
                     case lists:member(TargetNode, Nodes) of
@@ -375,7 +375,8 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
                     % A really old doc referring to a node that's
                     % no longer in the cluster
                     {ok, Acc}
-                end
+                end,
+                {ok, NewAcc};
             _ ->
                 % We've processed all _local mem3 purge docs
                 {stop, Acc}
@@ -386,8 +387,7 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
     {ok, DbPSeq} = couch_db:get_purge_seq(Db),
     Lag = config:get_integer("couchdb", "read_repair_lag", 100),
 
-    {TotesGood, NeedChecking} =
-            lists:foldl(fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
+    CheckSeqFun = fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
         NodeSeq = case lists:keyfind(Node, 1, NodeSeqs) of
             {Node, PS} -> PS;
             false -> 0
@@ -398,27 +398,32 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
                 NewGTG = [{DocId, Rev} || Rev <- Revs] ++ GoodToGo,
                 {NewGTG, MaybeGood};
             _ when NodeSeq >= DbPSeq - Lag ->
-                {GoodToGo, [{NodeSeq, IdRevs} | MaybGood]};
+                {GoodToGo, [{NodeSeq, IdRevs} | MaybeGood]};
             _ ->
                 % The remote node `Node` is so far out of date
                 % we'll just ignore its read-repair updates rather
                 % than scan an unbounded number of purge infos
                 {GoodToGo, MaybeGood}
         end
-    end, {[], []}, NodeIdRevs),
+    end,
+    {TotesGood, NeedChecking} = lists:foldl(CheckSeqFun, {[], []}, NodeIdRevs),
 
     % For any node that's not up to date with internal
     % replication we have to check if any of the revisions
     % have been purged before running our updates
     RestGood = if NeedChecking == [] -> []; true ->
-        StartSeq = lists:min([S || {S, _, _} <- ExpandedChecks]),
         CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) ->
             FilterFun = fun({NS, FiltDocId, FiltRev}) ->
+                % The `NS =< PSeq` portion of this translates to the
+                % fact that we haven't yet replicated PSeq to the
+                % target node, hence we would need to filter this read
+                % repair update or risk undoing a purge operation.
                 NS =< PSeq andalso FiltDocId == DocId
                         andalso lists:member(FiltRev, Revs)
             end,
             {ok, lists:filter(FilterFun, Acc)}
         end,
+        StartSeq = lists:min([S || {S, _} <- NeedChecking]),
         InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) ->
             [{NodeSeq, DocId, Rev} || Rev <- Revs]
         end, NeedChecking),

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.