You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/21 18:26:21 UTC
[couchdb] 03/07: Update fabric_doc_open_revs to handle purges
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 57719ecc3635230977db3779f7cd44eefdd1d8d6
Author: Mayya Sharipova <ma...@ca.ibm.com>
AuthorDate: Tue Oct 3 15:56:55 2017 -0400
Update fabric_doc_open_revs to handle purges
We need to account for the possibility that a document is opened while a
purge is still propogating between shards. This means we need to inform
the read-repair algorithms that a difference in revisions may be due to
a purge request in progress. If we don't do this its possible that a
read-repair may race the purge request and effectively undo the purge.
---
src/fabric/src/fabric_doc_open_revs.erl | 148 ++++++++++++++++++++++----------
src/fabric/src/fabric_rpc.erl | 67 ++++++++-------
2 files changed, 137 insertions(+), 78 deletions(-)
diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 096722f..dbe02bf 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,7 +29,8 @@
revs,
latest,
replies = [],
- repair = false
+ repair = false,
+ replies_by_node=[] %[{Node, Reply}] used for read_repair
}).
go(DbName, Id, Revs, Options) ->
@@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
worker_count = WorkerCount,
workers = Workers,
replies = PrevReplies,
+ replies_by_node = PrevNReplies,
r = R,
revs = Revs,
latest = Latest,
@@ -92,13 +94,14 @@ handle_message({ok, RawReplies}, Worker, State) ->
IsTree = Revs == all orelse Latest,
% Do not count error replies when checking quorum
-
RealReplyCount = ReplyCount + 1 - ReplyErrorCount,
QuorumReplies = RealReplyCount >= R,
{NewReplies, QuorumMet, Repair} = case IsTree of
true ->
- {NewReplies0, AllInternal, Repair0} =
+ {NewReplies0, AllInternal, Repair00} =
tree_replies(PrevReplies, tree_sort(RawReplies)),
+ % don't set Repair=true on the first reply
+ Repair0 = (ReplyCount > 0) and Repair00,
NumLeafs = couch_key_tree:count_leafs(PrevReplies),
SameNumRevs = length(RawReplies) == NumLeafs,
QMet = AllInternal andalso SameNumRevs andalso QuorumReplies,
@@ -107,6 +110,10 @@ handle_message({ok, RawReplies}, Worker, State) ->
{NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
{NewReplies0, MinCount >= R, false}
end,
+ NewNReplies = case Worker of
+ nil -> PrevNReplies;
+ _ -> [{Worker#shard.node, RawReplies}|PrevNReplies]
+ end,
Complete = (ReplyCount =:= (WorkerCount - 1)),
@@ -117,6 +124,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
DbName,
IsTree,
NewReplies,
+ NewNReplies,
ReplyCount + 1,
InRepair orelse Repair
),
@@ -124,6 +132,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
false ->
{ok, State#state{
replies = NewReplies,
+ replies_by_node = NewNReplies,
reply_count = ReplyCount + 1,
workers = lists:delete(Worker, Workers),
repair = InRepair orelse Repair
@@ -180,7 +189,7 @@ dict_replies(Dict, [Reply | Rest]) ->
dict_replies(NewDict, Rest).
-maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeReplies0, ReplyCount, DoRepair) ->
Docs = case IsTree of
true -> tree_repair_docs(Replies, DoRepair);
false -> dict_repair_docs(Replies, ReplyCount)
@@ -189,7 +198,11 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
[] ->
ok;
_ ->
- erlang:spawn(fun() -> read_repair(Db, Docs) end)
+ NodeReplies = lists:foldl(fun({Node, NReplies}, Acc) ->
+ NewAcc = [{Node, Doc} || {ok, Doc} <- NReplies],
+ NewAcc ++ Acc
+ end, [], NodeReplies0),
+ erlang:spawn(fun() -> read_repair(Db, Docs, NodeReplies) end)
end.
@@ -208,8 +221,9 @@ dict_repair_docs(Replies, ReplyCount) ->
end.
-read_repair(Db, Docs) ->
- Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
+read_repair(Db, Docs, NodeReplies) ->
+ Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeReplies}],
+ Res = fabric:update_docs(Db, Docs, Opts),
case Res of
{ok, []} ->
couch_stats:increment_counter([fabric, read_repairs, success]);
@@ -268,20 +282,24 @@ filter_reply(Replies) ->
setup() ->
config:start_link([]),
meck:new([fabric, couch_stats, couch_log]),
+ meck:new(fabric_util, [passthrough]),
meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
- meck:expect(couch_log, notice, fun(_, _) -> ok end).
+ meck:expect(couch_log, notice, fun(_, _) -> ok end),
+ meck:expect(fabric_util, cleanup, fun(_) -> ok end).
+
teardown(_) ->
- (catch meck:unload([fabric, couch_stats, couch_log])),
+ (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])),
config:stop().
state0(Revs, Latest) ->
#state{
worker_count = 3,
- workers = [w1, w2, w3],
+ workers =
+ [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}],
r = 2,
revs = Revs,
latest = Latest
@@ -334,27 +352,35 @@ open_doc_revs_test_() ->
check_empty_response_not_quorum() ->
% Simple smoke test that we don't think we're
% done with a first empty response
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
?_assertMatch(
- {ok, #state{workers = [w2, w3]}},
- handle_message({ok, []}, w1, state0(all, false))
+ {ok, #state{workers = [W2, W3]}},
+ handle_message({ok, []}, W1, state0(all, false))
).
check_basic_response() ->
% Check that we've handle a response
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
?_assertMatch(
- {ok, #state{reply_count = 1, workers = [w2, w3]}},
- handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false))
+ {ok, #state{reply_count = 1, workers = [W2, W3]}},
+ handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false))
).
check_finish_quorum() ->
% Two messages with the same revisions means we're done
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
Expect = {stop, [bar1(), foo1()]},
- ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1))
+ ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1))
end).
@@ -363,11 +389,13 @@ check_finish_quorum_newer() ->
% foo1 should count for foo2 which means we're finished.
% We also validate that read_repair was triggered.
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
Expect = {stop, [bar1(), foo2()]},
ok = meck:reset(fabric),
- ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)),
+ ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)),
ok = meck:wait(fabric, update_docs, '_', 5000),
?assertMatch(
[{_, {fabric, update_docs, [_, _, _]}, _}],
@@ -380,11 +408,14 @@ check_no_quorum_on_second() ->
% Quorum not yet met for the foo revision so we
% would wait for w3
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
?assertMatch(
- {ok, #state{workers = [w3]}},
- handle_message({ok, [bar1()]}, w2, S1)
+ {ok, #state{workers = [W3]}},
+ handle_message({ok, [bar1()]}, W2, S1)
)
end).
@@ -394,11 +425,14 @@ check_done_on_third() ->
% what. Every revision seen in this pattern should be
% included.
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
- {ok, S2} = handle_message({ok, [bar1()]}, w2, S1),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
+ {ok, S2} = handle_message({ok, [bar1()]}, W2, S1),
Expect = {stop, [bar1(), foo1()]},
- ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2))
+ ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2))
end).
@@ -407,108 +441,128 @@ check_done_on_third() ->
check_specific_revs_first_msg() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), false),
?assertMatch(
- {ok, #state{reply_count = 1, workers = [w2, w3]}},
- handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0)
+ {ok, #state{reply_count = 1, workers = [W2, W3]}},
+ handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0)
)
end).
check_revs_done_on_agreement() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), false),
Msg = {ok, [foo1(), bar1(), bazNF()]},
- {ok, S1} = handle_message(Msg, w1, S0),
+ {ok, S1} = handle_message(Msg, W1, S0),
Expect = {stop, [bar1(), foo1(), bazNF()]},
- ?assertEqual(Expect, handle_message(Msg, w2, S1))
+ ?assertEqual(Expect, handle_message(Msg, W2, S1))
end).
check_latest_true() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo2(), bar1(), bazNF()]},
Msg2 = {ok, [foo2(), bar1(), bazNF()]},
- {ok, S1} = handle_message(Msg1, w1, S0),
+ {ok, S1} = handle_message(Msg1, W1, S0),
Expect = {stop, [bar1(), foo2(), bazNF()]},
- ?assertEqual(Expect, handle_message(Msg2, w2, S1))
+ ?assertEqual(Expect, handle_message(Msg2, W2, S1))
end).
check_ancestor_counted_in_quorum() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo1(), bar1(), bazNF()]},
Msg2 = {ok, [foo2(), bar1(), bazNF()]},
Expect = {stop, [bar1(), foo2(), bazNF()]},
% Older first
- {ok, S1} = handle_message(Msg1, w1, S0),
- ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
% Newer first
- {ok, S2} = handle_message(Msg2, w2, S0),
- ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+ {ok, S2} = handle_message(Msg2, W2, S0),
+ ?assertEqual(Expect, handle_message(Msg1, W1, S2))
end).
check_not_found_counts_for_descendant() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo1(), bar1(), bazNF()]},
Msg2 = {ok, [foo1(), bar1(), baz1()]},
Expect = {stop, [bar1(), baz1(), foo1()]},
% not_found first
- {ok, S1} = handle_message(Msg1, w1, S0),
- ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
% not_found second
- {ok, S2} = handle_message(Msg2, w2, S0),
- ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+ {ok, S2} = handle_message(Msg2, W2, S0),
+ ?assertEqual(Expect, handle_message(Msg1, W1, S2))
end).
check_worker_error_skipped() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo1(), bar1(), baz1()]},
Msg2 = {rexi_EXIT, reason},
Msg3 = {ok, [foo1(), bar1(), baz1()]},
Expect = {stop, [bar1(), baz1(), foo1()]},
- {ok, S1} = handle_message(Msg1, w1, S0),
- {ok, S2} = handle_message(Msg2, w2, S1),
- ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ {ok, S2} = handle_message(Msg2, W2, S1),
+ ?assertEqual(Expect, handle_message(Msg3, W3, S2))
end).
check_quorum_only_counts_valid_responses() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), true),
Msg1 = {rexi_EXIT, reason},
Msg2 = {rexi_EXIT, reason},
Msg3 = {ok, [foo1(), bar1(), baz1()]},
Expect = {stop, [bar1(), baz1(), foo1()]},
- {ok, S1} = handle_message(Msg1, w1, S0),
- {ok, S2} = handle_message(Msg2, w2, S1),
- ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ {ok, S2} = handle_message(Msg2, W2, S1),
+ ?assertEqual(Expect, handle_message(Msg3, W3, S2))
end).
check_empty_list_when_no_workers_reply() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), true),
Msg1 = {rexi_EXIT, reason},
Msg2 = {rexi_EXIT, reason},
Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil},
Expect = {stop, all_workers_died},
- {ok, S1} = handle_message(Msg1, w1, S0),
- {ok, S2} = handle_message(Msg2, w2, S1),
- ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ {ok, S2} = handle_message(Msg2, W2, S1),
+ ?assertEqual(Expect, handle_message(Msg3, W3, S2))
end).
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 6e2c05f..2c4d5f4 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -344,48 +344,53 @@ update_docs_read_repair(DbName, DocsByNode, Options) ->
% given [{Node, Doc}] diff revs of the same DocID from diff nodes
% returns [Doc] filtering out purged docs.
-% This is done for read-repair from fabric_doc_open,
+% This is done for read-repair from fabric_doc_open or fabric_doc_open_revs,
% so that not to recreate Docs that have been purged before
% on this node() from Nodes that are out of sync.
filter_purged_revs(Db, DocsByNode) ->
- AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
- {ok, DbPSeq} = couch_db:get_purge_seq(Db),
- PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) -> [{Id, Revs}|Acc] end,
+ % go through _local/purge-mem3-.. docs
+ % and assemble NodePSeqs = [{Node1, NodePSeq1}, ...]
+ % NodePSeq1 - purge_seq of this node known to Node1
V = "v" ++ config:get("purge", "version", "1") ++ "-",
StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem3-"),
EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
- % go through _local/purge-mem3-.. docs
- % find Node that this LDoc corresponds to
- % check if update from Node has not been recently purged on current node
LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
{VOps} = couch_util:get_value(<<"verify_options">>, Props),
Node = couch_util:get_value(<<"node">>, VOps),
- Result = lists:keyfind(Node, 1, DocsByNode),
- NewAcc = if not Result -> Acc; true ->
- {Node, Doc} = Result,
- NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
- if NodePSeq == DbPSeq ->
- [Doc|Acc];
- (NodePSeq+AllowedPSeqLag) < DbPSeq ->
- % Node is very out of sync, ignore updates from it
- Acc;
- true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
- % if Doc has been purged recently, than ignore it
- {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
- NodePSeq, PurgeFoldFun, [], []),
- {Start, [FirstRevId|_]} = Doc#doc.revs,
- DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
- case lists:member(DocIdRevs, PurgedIdsRevs) of
- true -> Acc;
- false -> [Doc|Acc]
- end
- end
- end,
- {ok, NewAcc}
+ NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ {ok, [{Node, NodePSeq} | Acc]}
end,
- {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
- Docs.
+ {ok, NodePSeqs} =
+ couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+
+ % go through all doc_updates and
+ % filter out updates from nodes that are behind in purges synchronization
+ AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
+ {ok, DbPSeq} = couch_db:get_purge_seq(Db),
+ PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) -> [{Id, Revs}|Acc] end,
+ lists:foldl(fun({Node, Doc}, Docs) ->
+ NodePSeq = case lists:keyfind(Node, 1, NodePSeqs) of
+ {Node, NodePSeq0} -> NodePSeq0;
+ false -> 0
+ end,
+ if NodePSeq == DbPSeq ->
+ [Doc|Docs];
+ (NodePSeq+AllowedPSeqLag) < DbPSeq ->
+ % Node is very out of sync, ignore updates from it
+ Docs;
+ true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
+ % if Doc has been purged recently -> ignore it
+ {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
+ NodePSeq, PurgeFoldFun, [], []),
+ {Start, [FirstRevId|_]} = Doc#doc.revs,
+ DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
+ case lists:member(DocIdRevs, PurgedIdsRevs) of
+ true -> Docs;
+ false -> [Doc|Docs]
+ end
+ end
+ end, [], DocsByNode).
get_or_create_db(DbName, Options) ->
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.