You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/26 20:55:49 UTC
[couchdb] 18/33: WIP - read repair - fabric_rpc.erl
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 922fce71f90ced2b7f9616cf26628bb541421597
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Mon Mar 26 13:07:04 2018 -0500
WIP - read repair - fabric_rpc.erl
---
src/fabric/src/fabric_rpc.erl | 23 ++++++++++++++---------
1 file changed, 14 insertions(+), 9 deletions(-)
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index d1d33e9..d493da7 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -321,12 +321,12 @@ with_db(DbName, Options, {M,F,A}) ->
end.
-read_repair_filter_docs(DbName, NodeIdRevs, Docs, Options) ->
+read_repair_filter(DbName, NodeIdRevs, Docs, Options) ->
set_io_priority(DbName, Options),
case get_or_create_db(DbName, Options) of
{ok, Db} ->
try
- filter_purged_revs(Db, DocsByNode, Docs)
+ filter_purged_revs(Db, NodeIdRevs, Docs)
after
couch_db:close(Db)
end;
@@ -362,7 +362,7 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
case Id of
<<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> ->
TargetNodeBin = couch_util:get_value(<<"target_node">>, Props),
- PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props)
+ PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
NewAcc = try
TargetNode = binary_to_existing_atom(TargetNodeBin, latin1),
case lists:member(TargetNode, Nodes) of
@@ -375,7 +375,8 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
% A really old doc referring to a node that's
% no longer in the cluster
{ok, Acc}
- end
+ end,
+ {ok, NewAcc};
_ ->
% We've processed all _local mem3 purge docs
{stop, Acc}
@@ -386,8 +387,7 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
{ok, DbPSeq} = couch_db:get_purge_seq(Db),
Lag = config:get_integer("couchdb", "read_repair_lag", 100),
- {TotesGood, NeedChecking} =
- lists:foldl(fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
+ CheckSeqFun = fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
NodeSeq = case lists:keyfind(Node, 1, NodeSeqs) of
{Node, PS} -> PS;
false -> 0
@@ -398,27 +398,32 @@ filter_purged_revs(Db, NodeIdRevs, Docs) ->
NewGTG = [{DocId, Rev} || Rev <- Revs] ++ GoodToGo,
{NewGTG, MaybeGood};
_ when NodeSeq >= DbPSeq - Lag ->
- {GoodToGo, [{NodeSeq, IdRevs} | MaybGood]};
+ {GoodToGo, [{NodeSeq, IdRevs} | MaybeGood]};
_ ->
% The remote node `Node` is so far out of date
% we'll just ignore its read-repair updates rather
% than scan an unbounded number of purge infos
{GoodToGo, MaybeGood}
end
- end, {[], []}, NodeIdRevs),
+ end,
+ {TotesGood, NeedChecking} = lists:foldl(CheckSeqFun, {[], []}, NodeIdRevs),
% For any node that's not up to date with internal
% replication we have to check if any of the revisions
% have been purged before running our updates
RestGood = if NeedChecking == [] -> []; true ->
- StartSeq = lists:min([S || {S, _, _} <- ExpandedChecks]),
CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) ->
FilterFun = fun({NS, FiltDocId, FiltRev}) ->
+ % The `NS =< PSeq` portion of this translates to the
+ % fact that we haven't yet replicated PSeq to the
+ % target node, hence we would need to filter this read
+ % repair update or risk undoing a purge operation.
NS =< PSeq andalso FiltDocId == DocId
andalso lists:member(FiltRev, Revs)
end,
{ok, lists:filter(FilterFun, Acc)}
end,
+ StartSeq = lists:min([S || {S, _} <- NeedChecking]),
InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) ->
[{NodeSeq, DocId, Rev} || Rev <- Revs]
end, NeedChecking),
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.