You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/22 20:39:36 UTC
[couchdb] branch COUCHDB-3326-clustered-purge-davisp-refactor
updated: WIP - Updating read repair for a myriad edge cases
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/COUCHDB-3326-clustered-purge-davisp-refactor by this push:
new 05d9a3c WIP - Updating read repair for a myriad edge cases
05d9a3c is described below
commit 05d9a3c2ada452690f8be0f4a7745ddf53c710ad
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Mar 22 15:39:22 2018 -0500
WIP - Updating read repair for a myriad edge cases
---
src/fabric/src/fabric_rpc.erl | 182 ++++++++++++++++++++++++++----------------
1 file changed, 112 insertions(+), 70 deletions(-)
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 2c4d5f4..91fbb9e 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -244,15 +244,16 @@ update_docs(DbName, Docs0, Options) ->
true -> replicated_changes;
_ -> interactive_edit
end,
- DocsByNode = couch_util:get_value(read_repair, Options),
- case {X, DocsByNode} of
- {_, undefined} ->
- Docs = make_att_readers(Docs0),
- with_db(DbName, Options,
- {couch_db, update_docs, [Docs, Options, X]});
- {replicated_changes, _} ->
- update_docs_read_repair(DbName, DocsByNode, Options)
- end.
+ NodeIdRevs = couch_util:get_value(read_repair, Options),
+ Docs1 = case {X, is_list(NodeIdRevs)} of
+ {replicated_changes, true} ->
+ read_repair_filter(DbName, NodeIdRevs, Docs0, Options);
+ _ ->
+ Docs0
+ end,
+ Docs2 = make_att_readers(Docs1),
+ with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, X]}).
+
get_purge_seq(DbName, Options) ->
with_db(DbName, Options, {couch_db, get_purge_seq, []}).
@@ -320,77 +321,118 @@ with_db(DbName, Options, {M,F,A}) ->
end.
-update_docs_read_repair(DbName, DocsByNode, Options) ->
+read_repair_filter_docs(DbName, NodeIdRevs, Docs, Options) ->
set_io_priority(DbName, Options),
case get_or_create_db(DbName, Options) of
- {ok, Db} ->
- % omit Revisions that have been purged
- Docs = filter_purged_revs(Db, DocsByNode),
- Docs2 = make_att_readers(Docs),
- {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]},
- rexi:reply(try
- apply(M, F, [Db | A])
- catch Exception ->
- Exception;
- error:Reason ->
- couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
- clean_stack()]),
- {error, Reason}
- end);
- Error ->
- rexi:reply(Error)
+ {ok, Db} ->
+ try
+ filter_purged_revs(Db, DocsByNode, Docs)
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ rexi:reply(Error)
end.
-% given [{Node, Doc}] diff revs of the same DocID from diff nodes
-% returns [Doc] filtering out purged docs.
-% This is done for read-repair from fabric_doc_open or fabric_doc_open_revs,
-% so that not to recreate Docs that have been purged before
-% on this node() from Nodes that are out of sync.
-filter_purged_revs(Db, DocsByNode) ->
- % go through _local/purge-mem3-.. docs
- % and assemble NodePSeqs = [{Node1, NodePSeq1}, ...]
- % NodePSeq1 - purge_seq of this node known to Node1
- V = "v" ++ config:get("purge", "version", "1") ++ "-",
- StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem3-"),
- EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
- Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
- LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
- {VOps} = couch_util:get_value(<<"verify_options">>, Props),
- Node = couch_util:get_value(<<"node">>, VOps),
- NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
- {ok, [{Node, NodePSeq} | Acc]}
+% A read repair operation may have been triggered by a node
+% that was out of sync with the local node. Thus, any time
+% we receive a read repair request we need to check if we
+% may have recently purged any of the given revisions and
+% ignore them if so.
+%
+% This is accomplished by looking at the purge infos that we
+% have locally that have not been replicated to the remote
+% node. The logic here is that we may have received the purge
+% request before the remote shard copy. So to check that we
+% need to look at the purge infos that we have locally but
+% have not yet sent to the remote copy.
+%
+% NodeIdRevs are the list of {node(), {docid(), [rev()]}}
+% tuples passed as the read_repair option to update_docs.
+filter_purged_revs(Db, NodeIdRevs, Docs) ->
+ Nodes = lists:usort([Node || {Node, _IdRevs} <- NodeIdRevs]),
+
+ % Gather the list of {Node, PurgeSeq} pairs for all nodes
+ % that are present in our read repair group
+ StartKey = <<?LOCAL_DOC_PREFIX, "/purge-mem3-">>,
+ Opts = [{start_key, StartKey}],
+ FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+ case Id of
+ <<?LOCAL_DOC_PREFIX, "/purge-mem3-", _/binary>> ->
+ TargetNodeBin = couch_util:get_value(<<"target_node">>, Props),
+ PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props)
+ NewAcc = try
+ TargetNode = binary_to_existing_atom(TargetNodeBin, latin1),
+ case lists:member(TargetNode, Nodes) of
+ true ->
+ {ok, [{TargetNode, PurgeSeq} | Acc]};
+ false ->
+ {ok, Acc}
+ end
+ catch error:badarg ->
+ % A really old doc referring to a node that's
+ % no longer in the cluster
+ {ok, Acc}
+ end
+ _ ->
+ % We've processed all _local mem3 purge docs
+ {stop, Acc}
+ end
end,
- {ok, NodePSeqs} =
- couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+ {ok, NodeSeqs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
- % go through all doc_updates and
- % filter out updates from nodes that are behind in purges synchronization
- AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
{ok, DbPSeq} = couch_db:get_purge_seq(Db),
- PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) -> [{Id, Revs}|Acc] end,
- lists:foldl(fun({Node, Doc}, Docs) ->
- NodePSeq = case lists:keyfind(Node, 1, NodePSeqs) of
- {Node, NodePSeq0} -> NodePSeq0;
- false -> 0
+ Lag = config:get_integer("couchdb", "read_repair_lag", 100),
+
+ {TotesGood, NeedChecking} =
+ lists:foldl(fun({Node, IdRevs}, {GoodToGo, MaybeGood}) ->
+ NodeSeq = case lists:keyfind(Node, 1, NodeSeqs) of
+ {Node, PS} -> PS;
+ false -> 0
end,
- if NodePSeq == DbPSeq ->
- [Doc|Docs];
- (NodePSeq+AllowedPSeqLag) < DbPSeq ->
- % Node is very out of sync, ignore updates from it
- Docs;
- true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
- % if Doc has been purged recently -> ignore it
- {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
- NodePSeq, PurgeFoldFun, [], []),
- {Start, [FirstRevId|_]} = Doc#doc.revs,
- DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
- case lists:member(DocIdRevs, PurgedIdsRevs) of
- true -> Docs;
- false -> [Doc|Docs]
- end
+ case NodeSeq of
+ DbPSeq ->
+ {DocId, Revs} = IdRevs,
+ NewGTG = [{DocId, Rev} || Rev <- Revs] ++ GoodToGo,
+ {NewGTG, MaybeGood};
+ _ when NodeSeq >= DbPSeq - Lag ->
+ {GoodToGo, [{NodeSeq, IdRevs} | MaybGood]};
+ _ ->
+ % The remote node `Node` is so far out of date
+ % we'll just ignore its read-repair updates rather
+ % than scan an unbounded number of purge infos
+ {GoodToGo, MaybeGood}
end
- end, [], DocsByNode).
+ end, {[], []}, NodeIdRevs),
+
+ % For any node that's not up to date with internal
+ % replication we have to check if any of the revisions
+ % have been purged before running our updates
+ RestGood = if NeedChecking == [] -> []; true ->
+ StartSeq = lists:min([S || {S, _, _} <- ExpandedChecks]),
+ CheckFoldFun = fun({PSeq, _UUID, DocId, Revs}, Acc) ->
+ FilterFun = fun({NS, FiltDocId, FiltRev}) ->
+ NS =< PSeq andalso FiltDocId == DocId
+ andalso lists:member(FiltRev, Revs)
+ end,
+ {ok, lists:filter(FilterFun, Acc)}
+ end,
+ InitAcc = lists:flatmap(fun({NodeSeq, {DocId, Revs}}) ->
+ [{NodeSeq, DocId, Rev} || Rev <- Revs]
+ end, NeedChecking),
+ {ok, Result} =
+ couch_db:fold_purge_infos(Db, StartSeq, CheckFoldFun, InitAcc),
+ [{DocId, Rev} || {_NSeq, DocId, Rev} <- Result]
+ end,
+
+ % Finally, only return docs that have a revision that
+ % has not been filtered out of the initial set
+ AllGood = lists:usort(TotesGood ++ RestGood),
+ DocFiltFun = fun(#doc{id = Id, revs = {Pos, [Rev | _]}}) ->
+ lists:member({Id, {Pos, Rev}}, AllGood)
+ end,
+ lists:filter(DocFiltFun, Docs).
get_or_create_db(DbName, Options) ->
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.