You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/23 20:03:19 UTC

[couchdb] branch COUCHDB-3326-clustered-purge-davisp-refactor updated (7094d61 -> cf07f20)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 7094d61  WIP - fabric_doc_purge
     new cf07f20  WIP - fabric_doc_purge

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7094d61)
            \
             N -- N -- N   refs/heads/COUCHDB-3326-clustered-purge-davisp-refactor (cf07f20)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.

[couchdb] 01/01: WIP - fabric_doc_purge

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit cf07f20985b14382f214e2011ebe8332d0d273fd
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Mar 23 14:35:48 2018 -0500

    WIP - fabric_doc_purge
---
 src/fabric/src/fabric_doc_purge.erl | 753 ++++++++++++++++++------------------
 1 file changed, 378 insertions(+), 375 deletions(-)

diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
index 24e8c66..eead154 100644
--- a/src/fabric/src/fabric_doc_purge.erl
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -12,403 +12,406 @@
 
 -module(fabric_doc_purge).
 
--export([go/3]).
+
+-export([
+    go/3
+]).
+
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
+
+
+-record(acc, {
+    worker_uuids,
+    req_count,
+    resps,
+    w
+}).
+
 
 go(_, [], _) ->
     {ok, []};
-go(DbName, AllIdsRevs, Opts) ->
-    % tag each purge request with UUId
-    {AllUUIDs, AllUUIDsIdsRevs, DocCount} = tag_docs(AllIdsRevs),
-
-    Options = lists:delete(all_or_nothing, Opts),
-    % Counters -> [{Worker, UUIDs}]
-    {Counters, Workers} = dict:fold(fun(Shard, UUIDsIdsRevs, {Cs,Ws}) ->
-        UUIDs = [UUID || {UUID, _Id, _Revs} <-UUIDsIdsRevs],
-        #shard{name=Name, node=Node} = Shard,
-        Ref = rexi:cast(Node,
-            {fabric_rpc, purge_docs, [Name, UUIDsIdsRevs, Options]}),
+go(DbName, IdsRevs, Options) ->
+    % Generate our purge requests of {UUID, DocId, Revs}
+    {UUIDs, Reqs, Count} = create_reqs(IdsRevs, [], [], 0),
+
+    % Fire off rexi workers for each shard.
+    {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+        UUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
+        #shard{name = Name, node = Node} = Shard,
+        Args = [ShardDbName, Reqs, Options]
+        Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
         Worker = Shard#shard{ref=Ref},
-        {[{Worker, UUIDs}|Cs], [Worker|Ws]}
-    end, {[], []}, group_idrevs_by_shard(DbName, AllUUIDsIdsRevs)),
+        {[Worker | Ws], [{Worker, UUIDs} | WUUIDs]}
+    end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
 
     RexiMon = fabric_util:create_monitors(Workers),
-    W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
-    Acc = {length(Workers), DocCount, list_to_integer(W), Counters, dict:new()},
     Timeout = fabric_util:request_timeout(),
-    try rexi_utils:recv(Workers, #shard.ref,
-        fun handle_message/3, Acc, infinity, Timeout) of
-    {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
-        % Results-> [{UUID, {ok, Revs}}]
-        {Health, [R || R <-
-            couch_util:reorder_results(AllUUIDs, Results)]};
-    {timeout, Acc1} ->
-        {_, _, W1, Counters1, DocReplDict0} = Acc1,
-        {DefunctWorkers, _} = lists:unzip(Counters1),
-        fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
-        DocReplDict = lists:foldl(fun({_W, Docs}, Dict) ->
-            Replies = [{error, timeout} || _D <- Docs],
-            append_purge_replies(Docs, Replies, Dict)
-        end, DocReplDict0, Counters1),
-        {Health, _, Resp} = dict:fold(
-            fun force_reply/3, {ok, W1, []}, DocReplDict),
-        case Health of
-            error -> timeout;
-            _ -> {Health, [R || R <-
-                couch_util:reorder_results(AllUUIDs, Resp)]}
-
-        end;
-    Else ->
-        Else
+    Acc0 = #acc{
+        workers = Workers,
+        req_count = Count,
+        ref_uuids = RefUUIDs,
+        resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+        w = w(DbName, Options)
+    },
+    Acc2 = try rexi_utils:recv(Workers, #shard.ref,
+            fun handle_message/3, Acc0, infinity, Timeout) of
+        {ok, Acc1} ->
+            Acc1;
+        {timeout, Acc1} ->
+            #acc{
+                worker_uuids = WorkerUUIDs,
+                resps = Resps
+            } = Acc1
+            DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs]
+            fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+            NewResps = append_errors(timeout, WorkerUUIDs, Resps),
+            Acc1#acc{worker_uuids = [], resps = NewResps}
+        Else ->
+            Else
     after
         rexi_monitor:stop(RexiMon)
-    end.
+    end,
+
+    format_resps(UUIDs, Acc2).
+
+
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    Pred = fun({#shard{node = N}, _}) -> N == Node end,
+    {Failed, Rest} = lists:partition(Pred, WorkerUUIDs)
+    NewResps = append_errors(internal_server_error, Failed, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({rexi_EXIT, _}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps})
+
+handle_message({ok, Replies}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_resps(UUIDs, Replies, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
 
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
-    {_, DocCount, W, Counters, DocsDict0} = Acc0,
-    {FailCounters, NewCounters} = lists:partition(fun({#shard{node=N}, _}) ->
-        N == NodeRef
-    end, Counters),
-    % fill DocsDict with error messages for relevant Docs
-    DocsDict = lists:foldl(fun({_W, Docs}, CDocsDict) ->
-        Replies = [{error, internal_server_error} || _D <- Docs],
-        append_purge_replies(Docs, Replies, CDocsDict)
-    end, DocsDict0, FailCounters),
-    skip_message({length(NewCounters), DocCount, W, NewCounters, DocsDict});
-handle_message({rexi_EXIT, _}, Worker, Acc0) ->
-    {WC, DocCount , W, Counters, DocsDict0} = Acc0,
-    % fill DocsDict with error messages for relevant Docs
-    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
-    Replies = [{error, internal_server_error} || _D <- Docs],
-    DocsDict = append_purge_replies(Docs, Replies, DocsDict0),
-    skip_message({WC-1, DocCount, W, NewCounters, DocsDict});
-handle_message({ok, Replies0}, Worker, Acc0) ->
-    {WCount, DocCount, W, Counters, DocsDict0} = Acc0,
-    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
-    DocsDict = append_purge_replies(Docs, Replies0, DocsDict0),
-    case {WCount, dict:size(DocsDict)} of
-    {1, _} ->
-        % last message has arrived, we need to conclude things
-        {Health, W, Replies} = dict:fold(fun force_reply/3, {ok, W, []},
-           DocsDict),
-        {stop, {Health, Replies}};
-    {_, DocCount} ->
-        % we've got at least one reply for each document, let's take a look
-        case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocsDict) of
-        continue ->
-            {ok, {WCount - 1, DocCount, W, NewCounters, DocsDict}};
-        {stop, W, Replies} ->
-            {stop, {ok, Replies}}
-        end;
-    _ ->
-        {ok, {WCount - 1, DocCount, W, NewCounters, DocsDict}}
-    end;
-handle_message({error, purged_docs_limit_exceeded}=Error, Worker, Acc0) ->
-    {WC, DocCount , W, Counters, DocsDict0} = Acc0,
-    % fill DocsDict with error messages for relevant Docs
-    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
-    Replies = [Error || _D <- Docs],
-    DocsDict = append_purge_replies(Docs, Replies, DocsDict0),
-    skip_message({WC-1, DocCount, W, NewCounters, DocsDict});
 handle_message({bad_request, Msg}, _, _) ->
     throw({bad_request, Msg}).
 
 
-tag_docs(AllIdsRevs) ->
-    {UUIDs, UUIDsIdsRevs, DocCount} = lists:foldl(fun(
-        {Id, Revs}, {UAcc, UIRAcc, C}) ->
-        UUID = couch_uuids:new(),
-        {[UUID|UAcc], [{UUID, Id, Revs}|UIRAcc], C+1}
-    end, {[], [], 0}, AllIdsRevs),
-    {lists:reverse(UUIDs), lists:reverse(UUIDsIdsRevs), DocCount}.
-
-
-force_reply(Doc, Replies, {Health, W, Acc}) ->
-    case update_quorum_met(W, Replies) of
-    {true, FinalReply} ->
-        {Health, W, [{Doc, FinalReply} | Acc]};
-    false ->
-        case [Reply || {ok, Reply} <- Replies] of
-        [] ->
-            UReplies = lists:usort(Replies),
-            case UReplies of
-                [{error, internal_server_error}] ->
-                    {error, W, [{Doc, {error, internal_server_error}} | Acc]};
-                [{error, timeout}] ->
-                    {error, W, [{Doc, {error, timeout}} | Acc]};
-                [FirstReply|[]] ->
-                    % check if all errors are identical, if so inherit health
-                    {Health, W, [{Doc, FirstReply} | Acc]};
-                _ ->
-                    {error, W, [{Doc, UReplies} | Acc]}
-             end;
-        AcceptedReplies0 ->
-            NewHealth = case Health of ok -> accepted; _ -> Health end,
-            AcceptedReplies = lists:usort(lists:flatten(AcceptedReplies0)),
-            {NewHealth, W, [{Doc, {accepted, AcceptedReplies}} | Acc]}
-        end
-    end.
+create_reqs([], UUIDs, Reqs, Count) ->
+    {lists:reverse(UUIDs), lists:reverse(Reqs), Count};
 
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs, Count) ->
+    UUID = couch_uuids:new(),
+    NewUUIDS = [UUID | UUIDs],
+    NewReqs = [{UUID, Id, Revs} | Reqs],
+    create_reqs(RestIdsRevs, NewUUIDs, NewReqs, Count + 1).
 
-maybe_reply(_, _, continue) ->
-    % we didn't meet quorum for all docs, so we're fast-forwarding the fold
-    continue;
-maybe_reply(Doc, Replies, {stop, W, Acc}) ->
-    case update_quorum_met(W, Replies) of
-    {true, Reply} ->
-        {stop, W, [{Doc, Reply} | Acc]};
-    false ->
-        continue
-    end.
 
-update_quorum_met(W, Replies) ->
-    OkReplies = lists:foldl(fun(Reply, PrevsAcc) ->
-        case Reply of
-            {ok, PurgedRevs} -> [PurgedRevs | PrevsAcc];
-            _ -> PrevsAcc
-        end
-    end, [], Replies),
-    if length(OkReplies) < W -> false; true ->
-        % make a union of PurgedRevs
-        FinalReply = {ok, lists:usort(lists:flatten(OkReplies))},
-        {true, FinalReply}
-    end.
-
-
-group_idrevs_by_shard(DbName, UUIDsIdsRevs) ->
-    lists:foldl(fun({_UUID, Id, _Revs} = UUIDIdRevs, D0) ->
+group_reqs_by_shard(DbName, Reqs) ->
+    lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
         lists:foldl(fun(Shard, D1) ->
-            dict:append(Shard, UUIDIdRevs, D1)
+            dict:append(Shard, Req, D1)
         end, D0, mem3:shards(DbName, Id))
     end, dict:new(), UUIDsIdsRevs).
 
 
-append_purge_replies([], [], DocReplyDict) ->
-    DocReplyDict;
-append_purge_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
-    append_purge_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-
-skip_message({0, _, W, _, DocsDict}) ->
-    {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocsDict),
-    {stop, {Health, Reply}};
-skip_message(Acc0) ->
-    {ok, Acc0}.
-
-
-% eunits
-doc_purge_ok_test() ->
-    meck:new(couch_log),
-    meck:expect(couch_log, warning, fun(_,_) -> ok end),
-    meck:expect(couch_log, notice, fun(_,_) -> ok end),
-
-    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
-    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
-    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
-    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
-    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
-    Shards =
-        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
-    Counters = dict:to_list(
-        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
-    DocsDict = dict:new(),
-
-    % ***test for W = 2
-    AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-        Counters, DocsDict},
-    {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
-        handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2),
-    ?assertEqual(2, WaitingCountW2_1),
-    {stop, FinalReplyW2 } =
-        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-            lists:nth(2,Shards), AccW2_1),
-    ?assertEqual(
-        {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
-        FinalReplyW2
-    ),
-
-    % ***test for W = 3
-    AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
-        Counters, DocsDict},
-    {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
-        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3),
-    ?assertEqual(2, WaitingCountW3_1),
-    {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
-        handle_message({ok,[{ok, Revs1}, {ok, Revs2}]},
-            lists:nth(2,Shards), AccW3_1),
-    ?assertEqual(1, WaitingCountW3_2),
-    {stop, FinalReplyW3 } =
-        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-            lists:nth(3,Shards), AccW3_2),
-    ?assertEqual(
-        {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
-        FinalReplyW3
-    ),
-
-    % *** test rexi_exit on 1 node
-    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-        Counters, DocsDict},
-    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
-        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
-    ?assertEqual(2, WaitingCount1),
-    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
-        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
-    ?assertEqual(1, WaitingCount2),
-    {stop, Reply} =
-        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-            lists:nth(3,Shards), Acc2),
-    ?assertEqual(
-        {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
-        Reply
-    ),
-
-    % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
-    % *** still should return ok reply for the request
-    ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
-    Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
-        Counters, DocsDict},
-    {ok, {WaitingCount21,_,_,_,_} = Acc21} =
-        handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20),
-    ?assertEqual(2, WaitingCount21),
-    {ok, {WaitingCount22,_,_,_,_} = Acc22} =
-        handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21),
-    ?assertEqual(1, WaitingCount22),
-    {stop, Reply2 } =
-        handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22),
-    ?assertEqual(
-        {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]},
-        Reply2
-    ),
-
-    % *** test {error, purged_docs_limit_exceeded} on all nodes
-    % *** still should return ok reply for the request
-    ErrPDLE = {error, purged_docs_limit_exceeded},
-    Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
-        Counters, DocsDict},
-    {ok, {WaitingCount31,_,_,_,_} = Acc31} =
-        handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30),
-    ?assertEqual(2, WaitingCount31),
-    {ok, {WaitingCount32,_,_,_,_} = Acc32} =
-        handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31),
-    ?assertEqual(1, WaitingCount32),
-    {stop, Reply3 } =
-        handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32),
-    ?assertEqual(
-        {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]},
-        Reply3
-    ),
-    meck:unload(couch_log).
-
-
-doc_purge_accepted_test() ->
-    meck:new(couch_log),
-    meck:expect(couch_log, warning, fun(_,_) -> ok end),
-    meck:expect(couch_log, notice, fun(_,_) -> ok end),
-
-    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
-    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
-    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
-    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
-    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
-    Shards =
-        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
-    Counters = dict:to_list(
-        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
-    DocsDict = dict:new(),
-
-    % *** test rexi_exit on 2 nodes
-    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-        Counters, DocsDict},
-    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
-        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
-    ?assertEqual(2, WaitingCount1),
-    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
-        handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1),
-    ?assertEqual(1, WaitingCount2),
-    {stop, Reply} =
-        handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2),
-    ?assertEqual(
-        {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]},
-        Reply
-    ),
-    meck:unload(couch_log).
-
-
-doc_purge_error_test() ->
-    meck:new(couch_log),
-    meck:expect(couch_log, warning, fun(_,_) -> ok end),
-    meck:expect(couch_log, notice, fun(_,_) -> ok end),
-
-    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
-    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
-    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
-    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
-    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
-    Shards =
-        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
-    Counters = dict:to_list(
-        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
-    DocsDict = dict:new(),
-
-    % *** test rexi_exit on all 3 nodes
-    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-        Counters, DocsDict},
-    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
-        handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
-    ?assertEqual(2, WaitingCount1),
-    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
-        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
-    ?assertEqual(1, WaitingCount2),
-    {stop, Reply} =
-        handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
-    ?assertEqual(
-        {error, [{UUID1, {error, internal_server_error}},
-            {UUID2, {error, internal_server_error}}]},
-        Reply
-    ),
-
-    % ***test w quorum > # shards, which should fail immediately
-    Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
-    Counters2 = dict:to_list(
-        group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
-    AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
-        Counters2, DocsDict},
-    Bool =
-        case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-                hd(Shards), AccW4) of
-            {stop, _Reply} ->
-                true;
-            _ -> false
-        end,
-    ?assertEqual(true, Bool),
-
-    % *** test Docs with no replies should end up as {error, internal_server_error}
-    SA1 = #shard{node = a, range = [1]},
-    SA2 = #shard{node = a, range = [2]},
-    SB1 = #shard{node = b, range = [1]},
-    SB2 = #shard{node = b, range = [2]},
-    Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
-        {SA2,[UUID2]}, {SB2,[UUID2]}],
-    Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict},
-    {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30),
-    {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
-    {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
-    {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
-    ?assertEqual(
-        {error, [{UUID1, {accepted, Revs1}},
-            {UUID2, {error, internal_server_error}}]},
-        Acc34
-    ),
-    meck:unload(couch_log).
-
-
-% needed for testing to avoid having to start the mem3 application
-group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
-    lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
-        lists:foldl(fun(Shard, Dict1) ->
-            dict:append(Shard, UUID, Dict1)
-        end, Dict0, Shards)
-    end, dict:new(), UUIDsIdsRevs).
+w(DbName, Options) ->
+    try
+        list_to_integer(couch_util:get_value(w, Options))
+    catch _:_ ->
+        mem3:quorum(DbName)
+    end.
+
+
+append_errors(Type, WorkerUUIDs, Resps) ->
+    lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
+        Errors = [{error, Type} || _UUID <- UUIDs],
+        append_resps(UUIDs, Errors, RespAcc)
+    end, Resps, WorkerUUIDs).
+
+
+append_resps([], [], Resps) ->
+    Resps;
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
+    NewResps = dict:append(UUID, Reply, Resps),
+    append_resps(RestUUIDs, RestReplies, NewResps).
+
+
+maybe_stop(#acc{worker_uuids = []} = Acc) ->
+    {stop, Acc};
+maybe_stop(#acc{resps = Resps, w = W} = Acc) ->
+    try
+        dict:fold(fun(UUID, UUIDResps, _) ->
+            case has_quorum(UUIDResps, W) of
+                true -> ok;
+                false -> throw(keep_going)
+            end
+        end, nil, Resps)
+        {stop, Acc}
+    catch throw:keep_going ->
+        {ok, Acc}
+    end.
+
+
+format_resps(UUIDs, #acc{} = Acc) ->
+    #acc{
+        resps = Resps,
+        w = W
+    } = Acc,
+    FoldFun = fun(UUID, Replies, {Health, ReplyAcc}) ->
+        OkReplies = [Reply || {ok, Reply} <- Replies],
+        case OkReplies of
+            [] ->
+                [Error | _] = lists:usort(Replies),
+                throw(Error);
+            _ ->
+                AllRevs = lists:usort(lists:flatten(OkReplies)).
+                NewReplyAcc = [{UUID, {ok, AllRevs}} | ReplyAcc],
+                H = if length(OkReplies) >= W -> ok; true -> accepted end,
+                {update_health(H, Health), NewReplyAcc}
+        end
+    end,
+    {FinalHealth, FinalReplies} = dict:fold(FoldFun, {ok, []}, Resps),
+    {FinalHealth, couch_util:reorder_results(UUIDs, FinalReplies)};
+
+format_resps(_UUIDs, Else) ->
+    Else.
+
+
+has_quorum([], W) when W > 0 ->
+    false;
+has_quorum(_, W) when W =< 0 ->
+    true;
+has_quorum([{ok, _} | Rest], W) when W > 0 ->
+    has_quorum(Rest, W - 1).
+
+
+update_health(ok, Health) -> Health;
+update_health(accepted, _) -> accepted.
+
+
+%% % eunits
+%% doc_purge_ok_test() ->
+%%     meck:new(couch_log),
+%%     meck:expect(couch_log, warning, fun(_,_) -> ok end),
+%%     meck:expect(couch_log, notice, fun(_,_) -> ok end),
+%%
+%%     Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+%%     UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+%%     Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+%%     UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+%%     UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+%%     Shards =
+%%         mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+%%     Counters = dict:to_list(
+%%         group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+%%     DocsDict = dict:new(),
+%%
+%%     % ***test for W = 2
+%%     AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
+%%         handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2),
+%%     ?assertEqual(2, WaitingCountW2_1),
+%%     {stop, FinalReplyW2 } =
+%%         handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%%             lists:nth(2,Shards), AccW2_1),
+%%     ?assertEqual(
+%%         {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+%%         FinalReplyW2
+%%     ),
+%%
+%%     % ***test for W = 3
+%%     AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
+%%         handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3),
+%%     ?assertEqual(2, WaitingCountW3_1),
+%%     {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
+%%         handle_message({ok,[{ok, Revs1}, {ok, Revs2}]},
+%%             lists:nth(2,Shards), AccW3_1),
+%%     ?assertEqual(1, WaitingCountW3_2),
+%%     {stop, FinalReplyW3 } =
+%%         handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%%             lists:nth(3,Shards), AccW3_2),
+%%     ?assertEqual(
+%%         {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+%%         FinalReplyW3
+%%     ),
+%%
+%%     % *** test rexi_exit on 1 node
+%%     Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+%%         handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
+%%     ?assertEqual(2, WaitingCount1),
+%%     {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+%%         handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+%%     ?assertEqual(1, WaitingCount2),
+%%     {stop, Reply} =
+%%         handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%%             lists:nth(3,Shards), Acc2),
+%%     ?assertEqual(
+%%         {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+%%         Reply
+%%     ),
+%%
+%%     % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
+%%     % *** still should return ok reply for the request
+%%     ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
+%%     Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCount21,_,_,_,_} = Acc21} =
+%%         handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20),
+%%     ?assertEqual(2, WaitingCount21),
+%%     {ok, {WaitingCount22,_,_,_,_} = Acc22} =
+%%         handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21),
+%%     ?assertEqual(1, WaitingCount22),
+%%     {stop, Reply2 } =
+%%         handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22),
+%%     ?assertEqual(
+%%         {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]},
+%%         Reply2
+%%     ),
+%%
+%%     % *** test {error, purged_docs_limit_exceeded} on all nodes
+%%     % *** still should return ok reply for the request
+%%     ErrPDLE = {error, purged_docs_limit_exceeded},
+%%     Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCount31,_,_,_,_} = Acc31} =
+%%         handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30),
+%%     ?assertEqual(2, WaitingCount31),
+%%     {ok, {WaitingCount32,_,_,_,_} = Acc32} =
+%%         handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31),
+%%     ?assertEqual(1, WaitingCount32),
+%%     {stop, Reply3 } =
+%%         handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32),
+%%     ?assertEqual(
+%%         {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]},
+%%         Reply3
+%%     ),
+%%     meck:unload(couch_log).
+%%
+%%
+%% doc_purge_accepted_test() ->
+%%     meck:new(couch_log),
+%%     meck:expect(couch_log, warning, fun(_,_) -> ok end),
+%%     meck:expect(couch_log, notice, fun(_,_) -> ok end),
+%%
+%%     Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+%%     UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+%%     Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+%%     UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+%%     UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+%%     Shards =
+%%         mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+%%     Counters = dict:to_list(
+%%         group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+%%     DocsDict = dict:new(),
+%%
+%%     % *** test rexi_exit on 2 nodes
+%%     Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+%%         handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
+%%     ?assertEqual(2, WaitingCount1),
+%%     {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+%%         handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1),
+%%     ?assertEqual(1, WaitingCount2),
+%%     {stop, Reply} =
+%%         handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2),
+%%     ?assertEqual(
+%%         {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]},
+%%         Reply
+%%     ),
+%%     meck:unload(couch_log).
+%%
+%%
+%% doc_purge_error_test() ->
+%%     meck:new(couch_log),
+%%     meck:expect(couch_log, warning, fun(_,_) -> ok end),
+%%     meck:expect(couch_log, notice, fun(_,_) -> ok end),
+%%
+%%     Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+%%     UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+%%     Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+%%     UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+%%     UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+%%     Shards =
+%%         mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+%%     Counters = dict:to_list(
+%%         group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+%%     DocsDict = dict:new(),
+%%
+%%     % *** test rexi_exit on all 3 nodes
+%%     Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+%%         Counters, DocsDict},
+%%     {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+%%         handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
+%%     ?assertEqual(2, WaitingCount1),
+%%     {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+%%         handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+%%     ?assertEqual(1, WaitingCount2),
+%%     {stop, Reply} =
+%%         handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+%%     ?assertEqual(
+%%         {error, [{UUID1, {error, internal_server_error}},
+%%             {UUID2, {error, internal_server_error}}]},
+%%         Reply
+%%     ),
+%%
+%%     % ***test w quorum > # shards, which should fail immediately
+%%     Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
+%%     Counters2 = dict:to_list(
+%%         group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
+%%     AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
+%%         Counters2, DocsDict},
+%%     Bool =
+%%         case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+%%                 hd(Shards), AccW4) of
+%%             {stop, _Reply} ->
+%%                 true;
+%%             _ -> false
+%%         end,
+%%     ?assertEqual(true, Bool),
+%%
+%%     % *** test Docs with no replies should end up as {error, internal_server_error}
+%%     SA1 = #shard{node = a, range = [1]},
+%%     SA2 = #shard{node = a, range = [2]},
+%%     SB1 = #shard{node = b, range = [1]},
+%%     SB2 = #shard{node = b, range = [2]},
+%%     Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
+%%         {SA2,[UUID2]}, {SB2,[UUID2]}],
+%%     Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict},
+%%     {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30),
+%%     {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
+%%     {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
+%%     {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
+%%     ?assertEqual(
+%%         {error, [{UUID1, {accepted, Revs1}},
+%%             {UUID2, {error, internal_server_error}}]},
+%%         Acc34
+%%     ),
+%%     meck:unload(couch_log).
+%%
+%%
+%% % needed for testing to avoid having to start the mem3 application
+%% group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
+%%     lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
+%%         lists:foldl(fun(Shard, Dict1) ->
+%%             dict:append(Shard, UUID, Dict1)
+%%         end, Dict0, Shards)
+%%     end, dict:new(), UUIDsIdsRevs).

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.