[couchdb] 09/10: [09/10] Clustered Purge: Fabric API

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
commit 8c1c4d5ee01d4974d05d44f9bf689123924a2de6
Author: Paul J. Davis <>
AuthorDate: Tue Apr 24 12:27:43 2018 -0500

    [09/10] Clustered Purge: Fabric API
    This commit implements the clustered API for performing purge requests.
    This change should be a fairly straightforward change for anyone already
    familiar with the general implementation of a fabric coordinator given
    that the purge API is fairly simple.
    Co-authored-by: Mayya Sharipova <>
    Co-authored-by: jiangphcn <>
 src/fabric/src/fabric.erl           |  27 +-
 src/fabric/src/fabric_db_info.erl   |  29 +-
 src/fabric/src/fabric_db_meta.erl   |  26 +-
 src/fabric/src/fabric_doc_purge.erl | 572 ++++++++++++++++++++++++++++++++++++
 4 files changed, 638 insertions(+), 16 deletions(-)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 4a07271..b2e71dc 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -21,12 +21,13 @@
     delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
     set_security/2, set_security/3, get_revs_limit/1, get_security/1,
     get_security/2, get_all_security/1, get_all_security/2,
+    get_purge_infos_limit/1, set_purge_infos_limit/3,
     compact/1, compact/2]).
 % Documents
 -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
     get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
-    purge_docs/2, att_receiver/2]).
+    purge_docs/3, att_receiver/2]).
 % Views
 -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -137,6 +138,18 @@ set_security(DbName, SecObj) ->
 set_security(DbName, SecObj, Options) ->
     fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purge_infos_limit(DbName, Limit, Options)
+        when is_integer(Limit), Limit > 0 ->
+    fabric_db_meta:set_purge_infos_limit(dbname(DbName), Limit, opts(Options)).
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purge_infos_limit(dbname()) -> pos_integer() | no_return().
+get_purge_infos_limit(DbName) ->
+    {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+    try couch_db:get_purge_infos_limit(Db) after catch couch_db:close(Db) end.
 get_security(DbName) ->
     get_security(DbName, [?ADMIN_CTX]).
@@ -267,8 +280,16 @@ update_docs(DbName, Docs, Options) ->
         {aborted, PreCommitFailures}
-purge_docs(_DbName, _IdsRevs) ->
-    not_implemented.
+%% @doc purge revisions for a list '{Id, Revs}'
+%%      returns {ok, {PurgeSeq, Results}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+    {ok, [{Health, [revision()]}] | {error, any()}} when
+    Health :: ok | accepted.
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+    IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+    fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)).
 %% @doc spawns a process to upload attachment data and
 %%      returns a function that shards can use to communicate
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index 98e8e52..97a31c2 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -23,10 +23,12 @@ go(DbName) ->
     RexiMon = fabric_util:create_monitors(Shards),
     Fun = fun handle_message/3,
     {ok, ClusterInfo} = get_cluster_info(Shards),
-    Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
+    Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]},
         case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
-            {ok, Acc} -> {ok, Acc};
+            {ok, Acc} ->
+                {ok, Acc};
             {timeout, {WorkersDict, _}} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
@@ -37,44 +39,49 @@ go(DbName) ->
                 {error, timeout};
-            {error, Error} -> throw(Error)
+            {error, Error} ->
+                throw(Error)
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+        _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_util:remove_down_workers(Counters, NodeRef) of
     {ok, NewCounters} ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) ->
     NewCounters = fabric_dict:erase(Shard, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     false ->
         {error, Reason}
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_dict:lookup_element(Shard, Counters) of
     undefined ->
         % already heard from someone else in this range
-        {ok, {Counters, Acc}};
+        {ok, {Counters, PseqAcc, Acc}};
     nil ->
         Seq = couch_util:get_value(update_seq, Info),
         C1 = fabric_dict:store(Shard, Seq, Counters),
         C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+        PSeq = couch_util:get_value(purge_seq, Info),
+        NewPseqAcc = [{Shard, PSeq}|PseqAcc],
         case fabric_dict:any(nil, C2) of
         true ->
-            {ok, {C2, [Info|Acc]}};
+            {ok, {C2, NewPseqAcc, [Info|Acc]}};
         false ->
             {stop, [
+                {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)},
                 {update_seq, fabric_view_changes:pack_seqs(C2)} |
@@ -91,8 +98,6 @@ merge_results(Info) ->
             [{doc_count, lists:sum(X)} | Acc];
         (doc_del_count, X, Acc) ->
             [{doc_del_count, lists:sum(X)} | Acc];
-        (purge_seq, X, Acc) ->
-            [{purge_seq, lists:sum(X)} | Acc];
         (compact_running, X, Acc) ->
             [{compact_running, lists:member(true, X)} | Acc];
         (disk_size, X, Acc) -> % legacy
diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl
index 367ef06..26e1b37 100644
--- a/src/fabric/src/fabric_db_meta.erl
+++ b/src/fabric/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+    set_purge_infos_limit/3]).
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
     {error, Error}.
+set_purge_infos_limit(DbName, Limit, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]),
+    Handler = fun handle_purge_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+handle_purge_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+    {error, Error}.
 set_security(DbName, SecObj, Options) ->
     Shards = mem3:shards(DbName),
     RexiMon = fabric_util:create_monitors(Shards),
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
new file mode 100644
index 0000000..61b84d8
--- /dev/null
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -0,0 +1,572 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+    go/3
+-record(acc, {
+    worker_uuids,
+    resps,
+    uuid_counts,
+    w
+go(_, [], _) ->
+    {ok, []};
+go(DbName, IdsRevs, Options) ->
+    % Generate our purge requests of {UUID, DocId, Revs}
+    {UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
+    % Fire off rexi workers for each shard.
+    {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+        #shard{name = ShardDbName, node = Node} = Shard,
+        Args = [ShardDbName, ShardReqs, Options],
+        Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
+        Worker = Shard#shard{ref=Ref},
+        ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
+        {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
+    end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
+    UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) ->
+        lists:foldl(fun(UUID, InnerCountAcc) ->
+            dict:update_counter(UUID, 1, InnerCountAcc)
+        end, CountAcc, WUUIDs)
+    end, dict:new(), WorkerUUIDs),
+    RexiMon = fabric_util:create_monitors(Workers),
+    Timeout = fabric_util:request_timeout(),
+    Acc0 = #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+        uuid_counts = UUIDCounts,
+        w = w(DbName, Options)
+    },
+    Acc2 = try rexi_utils:recv(Workers, #shard.ref,
+            fun handle_message/3, Acc0, infinity, Timeout) of
+        {ok, Acc1} ->
+            Acc1;
+        {timeout, Acc1} ->
+            #acc{
+                worker_uuids = WorkerUUIDs,
+                resps = Resps
+            } = Acc1,
+            DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs],
+            fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+            NewResps = append_errors(timeout, WorkerUUIDs, Resps),
+            Acc1#acc{worker_uuids = [], resps = NewResps};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end,
+    FinalResps = format_resps(UUIDs, Acc2),
+    {resp_health(FinalResps), FinalResps}.
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    Pred = fun({#shard{node = N}, _}) -> N == Node end,
+    {Failed, Rest} = lists:partition(Pred, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, Failed, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+handle_message({rexi_EXIT, _}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+handle_message({ok, Replies}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_resps(UUIDs, Replies, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+handle_message({bad_request, Msg}, _, _) ->
+    throw({bad_request, Msg}).
+create_reqs([], UUIDs, Reqs) ->
+    {lists:reverse(UUIDs), lists:reverse(Reqs)};
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
+    UUID = couch_uuids:new(),
+    NewUUIDs = [UUID | UUIDs],
+    NewReqs = [{UUID, Id, Revs} | Reqs],
+    create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
+group_reqs_by_shard(DbName, Reqs) ->
+    lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
+        lists:foldl(fun(Shard, D1) ->
+            dict:append(Shard, Req, D1)
+        end, D0, mem3:shards(DbName, Id))
+    end, dict:new(), Reqs).
+w(DbName, Options) ->
+    try
+        list_to_integer(couch_util:get_value(w, Options))
+    catch _:_ ->
+        mem3:quorum(DbName)
+    end.
+append_errors(Type, WorkerUUIDs, Resps) ->
+    lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
+        Errors = [{error, Type} || _UUID <- UUIDs],
+        append_resps(UUIDs, Errors, RespAcc)
+    end, Resps, WorkerUUIDs).
+append_resps([], [], Resps) ->
+    Resps;
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
+    NewResps = dict:append(UUID, Reply, Resps),
+    append_resps(RestUUIDs, RestReplies, NewResps).
+maybe_stop(#acc{worker_uuids = []} = Acc) ->
+    {stop, Acc};
+maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
+    try
+        dict:fold(fun(UUID, UUIDResps, _) ->
+            UUIDCount = dict:fetch(UUID, Counts),
+            case has_quorum(UUIDResps, UUIDCount, W) of
+                true -> ok;
+                false -> throw(keep_going)
+            end
+        end, nil, Resps),
+        {stop, Acc}
+    catch throw:keep_going ->
+        {ok, Acc}
+    end.
+format_resps(UUIDs, #acc{} = Acc) ->
+    #acc{
+        resps = Resps,
+        w = W
+    } = Acc,
+    FoldFun = fun(UUID, Replies, ReplyAcc) ->
+        OkReplies = [Reply || {ok, Reply} <- Replies],
+        case OkReplies of
+            [] ->
+                [Error | _] = lists:usort(Replies),
+                [{UUID, Error} | ReplyAcc];
+            _ ->
+                AllRevs = lists:usort(lists:flatten(OkReplies)),
+                IsOk = length(OkReplies) >= W
+                        andalso length(lists:usort(OkReplies)) == 1,
+                Health = if IsOk -> ok; true -> accepted end,
+                [{UUID, {Health, AllRevs}} | ReplyAcc]
+        end
+    end,
+    FinalReplies = dict:fold(FoldFun, {ok, []}, Resps),
+    couch_util:reorder_results(UUIDs, FinalReplies);
+format_resps(_UUIDs, Else) ->
+    Else.
+resp_health(Resps) ->
+    Healths = lists:usort([H || {H, _} <- Resps]),
+    HasError = lists:member(error, Healths),
+    HasAccepted = lists:member(accepted, Healths),
+    AllOk = Healths == [ok],
+    if
+        HasError -> error;
+        HasAccepted -> accepted;
+        AllOk -> ok;
+        true -> error
+    end.
+has_quorum(Resps, Count, W) ->
+    OkResps = [R || {ok, _} = R <- Resps],
+    OkCounts = lists:foldl(fun(R, Acc) ->
+        dict:update_counter(R, 1, Acc)
+    end, dict:new(), OkResps),
+    MaxOk = lists:max([0 | element(2, lists:unzip(dict:to_list(OkCounts)))]),
+    if
+        MaxOk >= W -> true;
+        length(Resps) >= Count -> true;
+        true -> false
+    end.
+purge_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_w2_ok(),
+            t_w3_ok(),
+            t_w2_mixed_accepted(),
+            t_w3_mixed_accepted(),
+            t_w2_exit1_ok(),
+            t_w2_exit2_accepted(),
+            t_w2_exit3_error(),
+            t_w4_accepted(),
+            t_mixed_ok_accepted(),
+            t_mixed_errors()
+        ]
+    }.
+setup() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_, _) -> ok end),
+    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+teardown(_) ->
+    meck:unload().
+t_w2_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, true),
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+t_w3_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(3),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+t_w2_mixed_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+        Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [
+            {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+            {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+t_w3_mixed_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(3),
+        Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+        Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [
+            {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+            {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+t_w2_exit1_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+t_w2_exit2_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+t_w2_exit3_error() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        ExitMsg = {rexi_EXIT, blargh},
+        {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [
+            {error, internal_server_error},
+            {error, internal_server_error}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(error, resp_health(Resps))
+    end).
+t_w4_accepted() ->
+    % Make sure we return when all workers have responded
+    % rather than wait around for a timeout if a user asks
+    % for a qourum with more than the available number of
+    % shards.
+    ?_test(begin
+        Acc0 = create_init_acc(4),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+        Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+t_mixed_ok_accepted() ->
+    ?_test(begin
+        WorkerUUIDs = [
+            {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+        ],
+        Acc0 = #acc{
+            worker_uuids = WorkerUUIDs,
+            resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+            uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+            w = 2
+        },
+        Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]},
+        Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
+        {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+        {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+        {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
+        Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+t_mixed_errors() ->
+    ?_test(begin
+        WorkerUUIDs = [
+            {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+        ],
+        Acc0 = #acc{
+            worker_uuids = WorkerUUIDs,
+            resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+            uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+            w = 2
+        },
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+        {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+        {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
+        Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(error, resp_health(Resps))
+    end).
+create_init_acc(W) ->
+    UUID1 = <<"uuid1">>,
+    UUID2 = <<"uuid2">>,
+    Nodes = [node1, node2, node3],
+    Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes),
+    % Create our worker_uuids. We're relying on the fact that
+    % we're using a fake Q=1 db so we don't have to worry
+    % about any hashing here.
+    WorkerUUIDs = lists:map(fun(Shard) ->
+        {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]}
+    end, Shards),
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
+        uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
+        w = W
+    }.
+worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
+    {Worker, _} = lists:nth(N, WorkerUUIDs),
+    Worker.
+check_quorum(Acc, Expect) ->
+    dict:fold(fun(_Shard, Resps, _) ->
+        ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
+    end, nil, Acc#acc.resps).

