You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/23 20:03:19 UTC
[couchdb] branch COUCHDB-3326-clustered-purge-davisp-refactor
updated (7094d61 -> cf07f20)
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a change to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
discard 7094d61 WIP - fabric_doc_purge
new cf07f20 WIP - fabric_doc_purge
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (7094d61)
\
N -- N -- N refs/heads/COUCHDB-3326-clustered-purge-davisp-refactor (cf07f20)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.
[couchdb] 01/01: WIP - fabric_doc_purge
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit cf07f20985b14382f214e2011ebe8332d0d273fd
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Mar 23 14:35:48 2018 -0500
WIP - fabric_doc_purge
---
src/fabric/src/fabric_doc_purge.erl | 753 ++++++++++++++++++------------------
1 file changed, 378 insertions(+), 375 deletions(-)
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
index 24e8c66..eead154 100644
--- a/src/fabric/src/fabric_doc_purge.erl
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -12,403 +12,406 @@
-module(fabric_doc_purge).
--export([go/3]).
+
+-export([
+ go/3
+]).
+
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
+
+
+-record(acc, {
+ worker_uuids,
+ req_count,
+ resps,
+ w
+}).
+
go(_, [], _) ->
{ok, []};
-go(DbName, AllIdsRevs, Opts) ->
- % tag each purge request with UUId
- {AllUUIDs, AllUUIDsIdsRevs, DocCount} = tag_docs(AllIdsRevs),
-
- Options = lists:delete(all_or_nothing, Opts),
- % Counters -> [{Worker, UUIDs}]
- {Counters, Workers} = dict:fold(fun(Shard, UUIDsIdsRevs, {Cs,Ws}) ->
- UUIDs = [UUID || {UUID, _Id, _Revs} <-UUIDsIdsRevs],
- #shard{name=Name, node=Node} = Shard,
- Ref = rexi:cast(Node,
- {fabric_rpc, purge_docs, [Name, UUIDsIdsRevs, Options]}),
+go(DbName, IdsRevs, Options) ->
+ % Generate our purge requests of {UUID, DocId, Revs}
+ {UUIDs, Reqs, Count} = create_reqs(IdsRevs, [], [], 0),
+
+ % Fire off rexi workers for each shard.
+ {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
+ #shard{name = Name, node = Node} = Shard,
+ Args = [ShardDbName, Reqs, Options]
+ Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
Worker = Shard#shard{ref=Ref},
- {[{Worker, UUIDs}|Cs], [Worker|Ws]}
- end, {[], []}, group_idrevs_by_shard(DbName, AllUUIDsIdsRevs)),
+ {[Worker | Ws], [{Worker, UUIDs} | WUUIDs]}
+ end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
RexiMon = fabric_util:create_monitors(Workers),
- W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
- Acc = {length(Workers), DocCount, list_to_integer(W), Counters, dict:new()},
Timeout = fabric_util:request_timeout(),
- try rexi_utils:recv(Workers, #shard.ref,
- fun handle_message/3, Acc, infinity, Timeout) of
- {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
- % Results-> [{UUID, {ok, Revs}}]
- {Health, [R || R <-
- couch_util:reorder_results(AllUUIDs, Results)]};
- {timeout, Acc1} ->
- {_, _, W1, Counters1, DocReplDict0} = Acc1,
- {DefunctWorkers, _} = lists:unzip(Counters1),
- fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
- DocReplDict = lists:foldl(fun({_W, Docs}, Dict) ->
- Replies = [{error, timeout} || _D <- Docs],
- append_purge_replies(Docs, Replies, Dict)
- end, DocReplDict0, Counters1),
- {Health, _, Resp} = dict:fold(
- fun force_reply/3, {ok, W1, []}, DocReplDict),
- case Health of
- error -> timeout;
- _ -> {Health, [R || R <-
- couch_util:reorder_results(AllUUIDs, Resp)]}
-
- end;
- Else ->
- Else
+ Acc0 = #acc{
+ workers = Workers,
+ req_count = Count,
+ ref_uuids = RefUUIDs,
+ resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+ w = w(DbName, Options)
+ },
+ Acc2 = try rexi_utils:recv(Workers, #shard.ref,
+ fun handle_message/3, Acc0, infinity, Timeout) of
+ {ok, Acc1} ->
+ Acc1;
+ {timeout, Acc1} ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc1
+ DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs]
+ fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+ NewResps = append_errors(timeout, WorkerUUIDs, Resps),
+ Acc1#acc{worker_uuids = [], resps = NewResps}
+ Else ->
+ Else
after
rexi_monitor:stop(RexiMon)
- end.
+ end,
+
+ format_resps(UUIDs, Acc2).
+
+
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc,
+ Pred = fun({#shard{node = N}, _}) -> N == Node end,
+ {Failed, Rest} = lists:partition(Pred, WorkerUUIDs)
+ NewResps = append_errors(internal_server_error, Failed, Resps),
+ maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({rexi_EXIT, _}, Worker, Acc) ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc,
+ {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+ NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
+ maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps})
+
+handle_message({ok, Replies}, Worker, Acc) ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc,
+ {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+ NewResps = append_resps(UUIDs, Replies, Resps),
+ maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
- {_, DocCount, W, Counters, DocsDict0} = Acc0,
- {FailCounters, NewCounters} = lists:partition(fun({#shard{node=N}, _}) ->
- N == NodeRef
- end, Counters),
- % fill DocsDict with error messages for relevant Docs
- DocsDict = lists:foldl(fun({_W, Docs}, CDocsDict) ->
- Replies = [{error, internal_server_error} || _D <- Docs],
- append_purge_replies(Docs, Replies, CDocsDict)
- end, DocsDict0, FailCounters),
- skip_message({length(NewCounters), DocCount, W, NewCounters, DocsDict});
-handle_message({rexi_EXIT, _}, Worker, Acc0) ->
- {WC, DocCount , W, Counters, DocsDict0} = Acc0,
- % fill DocsDict with error messages for relevant Docs
- {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
- Replies = [{error, internal_server_error} || _D <- Docs],
- DocsDict = append_purge_replies(Docs, Replies, DocsDict0),
- skip_message({WC-1, DocCount, W, NewCounters, DocsDict});
-handle_message({ok, Replies0}, Worker, Acc0) ->
- {WCount, DocCount, W, Counters, DocsDict0} = Acc0,
- {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
- DocsDict = append_purge_replies(Docs, Replies0, DocsDict0),
- case {WCount, dict:size(DocsDict)} of
- {1, _} ->
- % last message has arrived, we need to conclude things
- {Health, W, Replies} = dict:fold(fun force_reply/3, {ok, W, []},
- DocsDict),
- {stop, {Health, Replies}};
- {_, DocCount} ->
- % we've got at least one reply for each document, let's take a look
- case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocsDict) of
- continue ->
- {ok, {WCount - 1, DocCount, W, NewCounters, DocsDict}};
- {stop, W, Replies} ->
- {stop, {ok, Replies}}
- end;
- _ ->
- {ok, {WCount - 1, DocCount, W, NewCounters, DocsDict}}
- end;
-handle_message({error, purged_docs_limit_exceeded}=Error, Worker, Acc0) ->
- {WC, DocCount , W, Counters, DocsDict0} = Acc0,
- % fill DocsDict with error messages for relevant Docs
- {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
- Replies = [Error || _D <- Docs],
- DocsDict = append_purge_replies(Docs, Replies, DocsDict0),
- skip_message({WC-1, DocCount, W, NewCounters, DocsDict});
handle_message({bad_request, Msg}, _, _) ->
throw({bad_request, Msg}).
-tag_docs(AllIdsRevs) ->
- {UUIDs, UUIDsIdsRevs, DocCount} = lists:foldl(fun(
- {Id, Revs}, {UAcc, UIRAcc, C}) ->
- UUID = couch_uuids:new(),
- {[UUID|UAcc], [{UUID, Id, Revs}|UIRAcc], C+1}
- end, {[], [], 0}, AllIdsRevs),
- {lists:reverse(UUIDs), lists:reverse(UUIDsIdsRevs), DocCount}.
-
-
-force_reply(Doc, Replies, {Health, W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, FinalReply} ->
- {Health, W, [{Doc, FinalReply} | Acc]};
- false ->
- case [Reply || {ok, Reply} <- Replies] of
- [] ->
- UReplies = lists:usort(Replies),
- case UReplies of
- [{error, internal_server_error}] ->
- {error, W, [{Doc, {error, internal_server_error}} | Acc]};
- [{error, timeout}] ->
- {error, W, [{Doc, {error, timeout}} | Acc]};
- [FirstReply|[]] ->
- % check if all errors are identical, if so inherit health
- {Health, W, [{Doc, FirstReply} | Acc]};
- _ ->
- {error, W, [{Doc, UReplies} | Acc]}
- end;
- AcceptedReplies0 ->
- NewHealth = case Health of ok -> accepted; _ -> Health end,
- AcceptedReplies = lists:usort(lists:flatten(AcceptedReplies0)),
- {NewHealth, W, [{Doc, {accepted, AcceptedReplies}} | Acc]}
- end
- end.
+create_reqs([], UUIDs, Reqs, Count) ->
+ {lists:reverse(UUIDs), lists:reverse(Reqs), Count};
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs, Count) ->
+ UUID = couch_uuids:new(),
+ NewUUIDS = [UUID | UUIDs],
+ NewReqs = [{UUID, Id, Revs} | Reqs],
+ create_reqs(RestIdsRevs, NewUUIDs, NewReqs, Count + 1).
-maybe_reply(_, _, continue) ->
- % we didn't meet quorum for all docs, so we're fast-forwarding the fold
- continue;
-maybe_reply(Doc, Replies, {stop, W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, Reply} ->
- {stop, W, [{Doc, Reply} | Acc]};
- false ->
- continue
- end.
-update_quorum_met(W, Replies) ->
- OkReplies = lists:foldl(fun(Reply, PrevsAcc) ->
- case Reply of
- {ok, PurgedRevs} -> [PurgedRevs | PrevsAcc];
- _ -> PrevsAcc
- end
- end, [], Replies),
- if length(OkReplies) < W -> false; true ->
- % make a union of PurgedRevs
- FinalReply = {ok, lists:usort(lists:flatten(OkReplies))},
- {true, FinalReply}
- end.
-
-
-group_idrevs_by_shard(DbName, UUIDsIdsRevs) ->
- lists:foldl(fun({_UUID, Id, _Revs} = UUIDIdRevs, D0) ->
+group_reqs_by_shard(DbName, Reqs) ->
+ lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, UUIDIdRevs, D1)
+ dict:append(Shard, Req, D1)
end, D0, mem3:shards(DbName, Id))
end, dict:new(), UUIDsIdsRevs).
-append_purge_replies([], [], DocReplyDict) ->
- DocReplyDict;
-append_purge_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
- append_purge_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-
-skip_message({0, _, W, _, DocsDict}) ->
- {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocsDict),
- {stop, {Health, Reply}};
-skip_message(Acc0) ->
- {ok, Acc0}.
-
-
-% eunits
-doc_purge_ok_test() ->
- meck:new(couch_log),
- meck:expect(couch_log, warning, fun(_,_) -> ok end),
- meck:expect(couch_log, notice, fun(_,_) -> ok end),
-
- Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
- UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
- Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
- UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
- UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
- Shards =
- mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
- Counters = dict:to_list(
- group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
- DocsDict = dict:new(),
-
- % ***test for W = 2
- AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
- Counters, DocsDict},
- {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
- handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2),
- ?assertEqual(2, WaitingCountW2_1),
- {stop, FinalReplyW2 } =
- handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
- lists:nth(2,Shards), AccW2_1),
- ?assertEqual(
- {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
- FinalReplyW2
- ),
-
- % ***test for W = 3
- AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
- Counters, DocsDict},
- {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
- handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3),
- ?assertEqual(2, WaitingCountW3_1),
- {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
- handle_message({ok,[{ok, Revs1}, {ok, Revs2}]},
- lists:nth(2,Shards), AccW3_1),
- ?assertEqual(1, WaitingCountW3_2),
- {stop, FinalReplyW3 } =
- handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
- lists:nth(3,Shards), AccW3_2),
- ?assertEqual(
- {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
- FinalReplyW3
- ),
-
- % *** test rexi_exit on 1 node
- Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
- Counters, DocsDict},
- {ok, {WaitingCount1,_,_,_,_} = Acc1} =
- handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
- ?assertEqual(2, WaitingCount1),
- {ok, {WaitingCount2,_,_,_,_} = Acc2} =
- handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
- ?assertEqual(1, WaitingCount2),
- {stop, Reply} =
- handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
- lists:nth(3,Shards), Acc2),
- ?assertEqual(
- {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
- Reply
- ),
-
- % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
- % *** still should return ok reply for the request
- ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
- Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
- Counters, DocsDict},
- {ok, {WaitingCount21,_,_,_,_} = Acc21} =
- handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20),
- ?assertEqual(2, WaitingCount21),
- {ok, {WaitingCount22,_,_,_,_} = Acc22} =
- handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21),
- ?assertEqual(1, WaitingCount22),
- {stop, Reply2 } =
- handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22),
- ?assertEqual(
- {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]},
- Reply2
- ),
-
- % *** test {error, purged_docs_limit_exceeded} on all nodes
- % *** still should return ok reply for the request
- ErrPDLE = {error, purged_docs_limit_exceeded},
- Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
- Counters, DocsDict},
- {ok, {WaitingCount31,_,_,_,_} = Acc31} =
- handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30),
- ?assertEqual(2, WaitingCount31),
- {ok, {WaitingCount32,_,_,_,_} = Acc32} =
- handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31),
- ?assertEqual(1, WaitingCount32),
- {stop, Reply3 } =
- handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32),
- ?assertEqual(
- {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]},
- Reply3
- ),
- meck:unload(couch_log).
-
-
-doc_purge_accepted_test() ->
- meck:new(couch_log),
- meck:expect(couch_log, warning, fun(_,_) -> ok end),
- meck:expect(couch_log, notice, fun(_,_) -> ok end),
-
- Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
- UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
- Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
- UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
- UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
- Shards =
- mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
- Counters = dict:to_list(
- group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
- DocsDict = dict:new(),
-
- % *** test rexi_exit on 2 nodes
- Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
- Counters, DocsDict},
- {ok, {WaitingCount1,_,_,_,_} = Acc1} =
- handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
- ?assertEqual(2, WaitingCount1),
- {ok, {WaitingCount2,_,_,_,_} = Acc2} =
- handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1),
- ?assertEqual(1, WaitingCount2),
- {stop, Reply} =
- handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2),
- ?assertEqual(
- {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]},
- Reply
- ),
- meck:unload(couch_log).
-
-
-doc_purge_error_test() ->
- meck:new(couch_log),
- meck:expect(couch_log, warning, fun(_,_) -> ok end),
- meck:expect(couch_log, notice, fun(_,_) -> ok end),
-
- Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
- UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
- Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
- UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
- UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
- Shards =
- mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
- Counters = dict:to_list(
- group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
- DocsDict = dict:new(),
-
- % *** test rexi_exit on all 3 nodes
- Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
- Counters, DocsDict},
- {ok, {WaitingCount1,_,_,_,_} = Acc1} =
- handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
- ?assertEqual(2, WaitingCount1),
- {ok, {WaitingCount2,_,_,_,_} = Acc2} =
- handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
- ?assertEqual(1, WaitingCount2),
- {stop, Reply} =
- handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
- ?assertEqual(
- {error, [{UUID1, {error, internal_server_error}},
- {UUID2, {error, internal_server_error}}]},
- Reply
- ),
-
- % ***test w quorum > # shards, which should fail immediately
- Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
- Counters2 = dict:to_list(
- group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
- AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
- Counters2, DocsDict},
- Bool =
- case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
- hd(Shards), AccW4) of
- {stop, _Reply} ->
- true;
- _ -> false
- end,
- ?assertEqual(true, Bool),
-
- % *** test Docs with no replies should end up as {error, internal_server_error}
- SA1 = #shard{node = a, range = [1]},
- SA2 = #shard{node = a, range = [2]},
- SB1 = #shard{node = b, range = [1]},
- SB2 = #shard{node = b, range = [2]},
- Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
- {SA2,[UUID2]}, {SB2,[UUID2]}],
- Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict},
- {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30),
- {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
- {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
- {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
- ?assertEqual(
- {error, [{UUID1, {accepted, Revs1}},
- {UUID2, {error, internal_server_error}}]},
- Acc34
- ),
- meck:unload(couch_log).
-
-
-% needed for testing to avoid having to start the mem3 application
-group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
- lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
- lists:foldl(fun(Shard, Dict1) ->
- dict:append(Shard, UUID, Dict1)
- end, Dict0, Shards)
- end, dict:new(), UUIDsIdsRevs).
+w(DbName, Options) ->
+ try
+ list_to_integer(couch_util:get_value(w, Options))
+ catch _:_ ->
+ mem3:quorum(DbName)
+ end.
+
+
+append_errors(Type, WorkerUUIDs, Resps) ->
+ lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
+ Errors = [{error, Type} || _UUID <- UUIDs],
+ append_resps(UUIDs, Errors, RespAcc)
+ end, Resps, WorkerUUIDs).
+
+
+append_resps([], [], Resps) ->
+ Resps;
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
+ NewResps = dict:append(UUID, Reply, Resps),
+ append_resps(RestUUIDs, RestReplies, NewResps).
+
+
+maybe_stop(#acc{worker_uuids = []} = Acc) ->
+ {stop, Acc};
+maybe_stop(#acc{resps = Resps, w = W} = Acc) ->
+ try
+ dict:fold(fun(UUID, UUIDResps, _) ->
+ case has_quorum(UUIDResps, W) of
+ true -> ok;
+ false -> throw(keep_going)
+ end
+ end, nil, Resps)
+ {stop, Acc}
+ catch throw:keep_going ->
+ {ok, Acc}
+ end.
+
+
+format_resps(UUIDs, #acc{} = Acc) ->
+ #acc{
+ resps = Resps,
+ w = W
+ } = Acc,
+ FoldFun = fun(UUID, Replies, {Health, ReplyAcc}) ->
+ OkReplies = [Reply || {ok, Reply} <- Replies],
+ case OkReplies of
+ [] ->
+ [Error | _] = lists:usort(Replies),
+ throw(Error);
+ _ ->
+ AllRevs = lists:usort(lists:flatten(OkReplies)).
+ NewReplyAcc = [{UUID, {ok, AllRevs}} | ReplyAcc],
+ H = if length(OkReplies) >= W -> ok; true -> accepted end,
+ {update_health(H, Health), NewReplyAcc}
+ end
+ end,
+ {FinalHealth, FinalReplies} = dict:fold(FoldFun, {ok, []}, Resps),
+ {FinalHealth, couch_util:reorder_results(UUIDs, FinalReplies)};
+
+format_resps(_UUIDs, Else) ->
+ Else.
+
+
+has_quorum([], W) when W > 0 ->
+ false;
+has_quorum(_, W) when W =< 0 ->
+ true;
+has_quorum([{ok, _} | Rest], W) when W > 0 ->
+ has_quorum(Rest, W - 1).
+
+
+update_health(ok, Health) -> Health;
+update_health(accepted, _) -> accepted.
+
+
+%% % eunits
+%% doc_purge_ok_test() ->
+%% meck:new(couch_log),
+%% meck:expect(couch_log, warning, fun(_,_) -> ok end),
+%% meck:expect(couch_log, notice, fun(_,_) -> ok end),
+%%
+%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+%% Shards =
+%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+%% Counters = dict:to_list(
+%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+%% DocsDict = dict:new(),
+%%
+%% % ***test for W = 2
+%% AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
+%% handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2),
+%% ?assertEqual(2, WaitingCountW2_1),
+%% {stop, FinalReplyW2 } =
+%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%% lists:nth(2,Shards), AccW2_1),
+%% ?assertEqual(
+%% {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+%% FinalReplyW2
+%% ),
+%%
+%% % ***test for W = 3
+%% AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
+%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3),
+%% ?assertEqual(2, WaitingCountW3_1),
+%% {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
+%% handle_message({ok,[{ok, Revs1}, {ok, Revs2}]},
+%% lists:nth(2,Shards), AccW3_1),
+%% ?assertEqual(1, WaitingCountW3_2),
+%% {stop, FinalReplyW3 } =
+%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%% lists:nth(3,Shards), AccW3_2),
+%% ?assertEqual(
+%% {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+%% FinalReplyW3
+%% ),
+%%
+%% % *** test rexi_exit on 1 node
+%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
+%% ?assertEqual(2, WaitingCount1),
+%% {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+%% handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+%% ?assertEqual(1, WaitingCount2),
+%% {stop, Reply} =
+%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%% lists:nth(3,Shards), Acc2),
+%% ?assertEqual(
+%% {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+%% Reply
+%% ),
+%%
+%% % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
+%% % *** still should return ok reply for the request
+%% ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
+%% Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCount21,_,_,_,_} = Acc21} =
+%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20),
+%% ?assertEqual(2, WaitingCount21),
+%% {ok, {WaitingCount22,_,_,_,_} = Acc22} =
+%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21),
+%% ?assertEqual(1, WaitingCount22),
+%% {stop, Reply2 } =
+%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22),
+%% ?assertEqual(
+%% {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]},
+%% Reply2
+%% ),
+%%
+%% % *** test {error, purged_docs_limit_exceeded} on all nodes
+%% % *** still should return ok reply for the request
+%% ErrPDLE = {error, purged_docs_limit_exceeded},
+%% Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCount31,_,_,_,_} = Acc31} =
+%% handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30),
+%% ?assertEqual(2, WaitingCount31),
+%% {ok, {WaitingCount32,_,_,_,_} = Acc32} =
+%% handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31),
+%% ?assertEqual(1, WaitingCount32),
+%% {stop, Reply3 } =
+%% handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32),
+%% ?assertEqual(
+%% {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]},
+%% Reply3
+%% ),
+%% meck:unload(couch_log).
+%%
+%%
+%% doc_purge_accepted_test() ->
+%% meck:new(couch_log),
+%% meck:expect(couch_log, warning, fun(_,_) -> ok end),
+%% meck:expect(couch_log, notice, fun(_,_) -> ok end),
+%%
+%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+%% Shards =
+%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+%% Counters = dict:to_list(
+%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+%% DocsDict = dict:new(),
+%%
+%% % *** test rexi_exit on 2 nodes
+%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
+%% ?assertEqual(2, WaitingCount1),
+%% {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+%% handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1),
+%% ?assertEqual(1, WaitingCount2),
+%% {stop, Reply} =
+%% handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2),
+%% ?assertEqual(
+%% {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]},
+%% Reply
+%% ),
+%% meck:unload(couch_log).
+%%
+%%
+%% doc_purge_error_test() ->
+%% meck:new(couch_log),
+%% meck:expect(couch_log, warning, fun(_,_) -> ok end),
+%% meck:expect(couch_log, notice, fun(_,_) -> ok end),
+%%
+%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+%% Shards =
+%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+%% Counters = dict:to_list(
+%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+%% DocsDict = dict:new(),
+%%
+%% % *** test rexi_exit on all 3 nodes
+%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%% Counters, DocsDict},
+%% {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+%% handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
+%% ?assertEqual(2, WaitingCount1),
+%% {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+%% handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+%% ?assertEqual(1, WaitingCount2),
+%% {stop, Reply} =
+%% handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+%% ?assertEqual(
+%% {error, [{UUID1, {error, internal_server_error}},
+%% {UUID2, {error, internal_server_error}}]},
+%% Reply
+%% ),
+%%
+%% % ***test w quorum > # shards, which should fail immediately
+%% Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
+%% Counters2 = dict:to_list(
+%% group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
+%% AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
+%% Counters2, DocsDict},
+%% Bool =
+%% case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%% hd(Shards), AccW4) of
+%% {stop, _Reply} ->
+%% true;
+%% _ -> false
+%% end,
+%% ?assertEqual(true, Bool),
+%%
+%% % *** test Docs with no replies should end up as {error, internal_server_error}
+%% SA1 = #shard{node = a, range = [1]},
+%% SA2 = #shard{node = a, range = [2]},
+%% SB1 = #shard{node = b, range = [1]},
+%% SB2 = #shard{node = b, range = [2]},
+%% Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
+%% {SA2,[UUID2]}, {SB2,[UUID2]}],
+%% Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict},
+%% {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30),
+%% {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
+%% {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
+%% {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
+%% ?assertEqual(
+%% {error, [{UUID1, {accepted, Revs1}},
+%% {UUID2, {error, internal_server_error}}]},
+%% Acc34
+%% ),
+%% meck:unload(couch_log).
+%%
+%%
+%% % needed for testing to avoid having to start the mem3 application
+%% group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
+%% lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
+%% lists:foldl(fun(Shard, Dict1) ->
+%% dict:append(Shard, UUID, Dict1)
+%% end, Dict0, Shards)
+%% end, dict:new(), UUIDsIdsRevs).
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.