You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/15 22:56:40 UTC

[couchdb] 01/01: Fix fabric worker failures for partition requests

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch degraded-mode-fixes
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6052dadee1bb6bf093f2ced8476bae0228c690ba
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Wed Jan 15 12:55:19 2020 -0500

    Fix fabric worker failures for partition requests
    
    Previously any failed node or rexi worker error resulted in requests failing
    immediately even though there were available workers to keep handling the
    request. This was because the progress check function didn't account for the
    fact that partition requests only use a handful of shards which, by design, do
    not complete the full ring.
    
    Here we fix both partition info queries and dreyfus search functionality. We
    follow the pattern from fabric and pass through a set of "ring options" that
    let the progress function know it is dealing with partitions instead of a full
    ring.
---
 src/dreyfus/src/dreyfus_fabric.erl          | 133 ++++++++++++++++++++++++----
 src/dreyfus/src/dreyfus_fabric_group1.erl   |   9 +-
 src/dreyfus/src/dreyfus_fabric_group2.erl   |   9 +-
 src/dreyfus/src/dreyfus_fabric_info.erl     |   6 +-
 src/dreyfus/src/dreyfus_fabric_search.erl   |  18 ++--
 src/dreyfus/src/dreyfus_util.erl            |  22 ++++-
 src/fabric/src/fabric_db_partition_info.erl |  84 +++++++++++++++---
 src/fabric/src/fabric_util.erl              |   7 +-
 src/fabric/src/fabric_view.erl              |  36 +-------
 9 files changed, 239 insertions(+), 85 deletions(-)

diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl
index a953b6a..0b25a6c 100644
--- a/src/dreyfus/src/dreyfus_fabric.erl
+++ b/src/dreyfus/src/dreyfus_fabric.erl
@@ -14,7 +14,7 @@
 %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
 
 -module(dreyfus_fabric).
--export([get_json_docs/2, handle_error_message/6]).
+-export([get_json_docs/2, handle_error_message/7]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -36,40 +36,42 @@ callback(timeout, _Acc) ->
     {error, timeout}.
 
 handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker,
-                     Counters, _Replacements, _StartFun, _StartArgs) ->
-    case fabric_util:remove_down_workers(Counters, NodeRef) of
+        Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+    case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
     {ok, NewCounters} ->
         {ok, NewCounters};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
     end;
 handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker,
-                     Counters, Replacements, StartFun, StartArgs) ->
-    handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs);
+        Counters, Replacements, StartFun, StartArgs, RingOpts) ->
+    handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs,
+        RingOpts);
 handle_error_message({rexi_EXIT, Reason}, Worker,
-                     Counters, _Replacements, _StartFun, _StartArgs) ->
-    handle_error(Reason, Worker, Counters);
+        Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+    handle_error(Reason, Worker, Counters, RingOpts);
 handle_error_message({error, Reason}, Worker,
-                     Counters, _Replacements, _StartFun, _StartArgs) ->
-    handle_error(Reason, Worker, Counters);
+        Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+    handle_error(Reason, Worker, Counters, RingOpts);
 handle_error_message({'EXIT', Reason}, Worker,
-                     Counters, _Replacements, _StartFun, _StartArgs) ->
-    handle_error({exit, Reason}, Worker, Counters);
+        Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+    handle_error({exit, Reason}, Worker, Counters, RingOpts);
 handle_error_message(Reason, Worker, Counters,
-                     _Replacements, _StartFun, _StartArgs) ->
+        _Replacements, _StartFun, _StartArgs, RingOpts) ->
     couch_log:error("Unexpected error during request: ~p", [Reason]),
-    handle_error(Reason, Worker, Counters).
+    handle_error(Reason, Worker, Counters, RingOpts).
 
-handle_error(Reason, Worker, Counters0) ->
+handle_error(Reason, Worker, Counters0, RingOpts) ->
     Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
+    case fabric_ring:is_progress_possible(Counters, RingOpts) of
     true ->
         {ok, Counters};
     false ->
         {error, Reason}
     end.
 
-handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
+handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs,
+        RingOpts) ->
     OldCounters = lists:filter(fun({#shard{ref=R}, _}) ->
         R /= Worker#shard.ref
     end, OldCntrs0),
@@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
                 NewCounter = start_replacement(StartFun, StartArgs, Repl),
                 fabric_dict:store(NewCounter, nil, CounterAcc)
             end, OldCounters, Replacements),
-            true = fabric_view:is_progress_possible(NewCounters),
+            true = fabric_ring:is_progress_possible(NewCounters, RingOpts),
             NewRefs = fabric_dict:fetch_keys(NewCounters),
             {new_refs, NewRefs, NewCounters, NewReplacements};
         false ->
             handle_error({nodedown, <<"progress not possible">>},
-                         Worker, OldCounters)
+                         Worker, OldCounters, RingOpts)
     end.
 
 start_replacement(StartFun, StartArgs, Shard) ->
@@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) ->
                     {dreyfus_rpc, StartFun,
                      [Shard#shard.name|StartArgs1]}),
     Shard#shard{ref = Ref}.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+node_down_test() ->
+    [S1, S2, S3] = [
+        mk_shard("n1", [0, 4]),
+        mk_shard("n1", [5, ?RING_END]),
+        mk_shard("n2", [0, ?RING_END])
+    ],
+    [W1, W2, W3] = [
+         S1#shard{ref = make_ref()},
+         S2#shard{ref = make_ref()},
+         S3#shard{ref = make_ref()}
+    ],
+    Counters1 = fabric_dict:init([W1, W2, W3], nil),
+
+    N1 = S1#shard.node,
+    Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
+    Res1 =  handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []),
+    ?assertEqual({ok, [{W3, nil}]}, Res1),
+
+    {ok, Counters2} = Res1,
+    N2 = S3#shard.node,
+    Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
+    Res2 =  handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []),
+    ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).
+
+
+worker_error_test() ->
+    [S1, S2] = [
+        mk_shard("n1", [0, ?RING_END]),
+        mk_shard("n2", [0, ?RING_END])
+    ],
+    [W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}],
+    Counters1 = fabric_dict:init([W1, W2], nil),
+
+    Res1 = handle_error(bam, W1, Counters1, []),
+    ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+    {ok, Counters2} = Res1,
+    ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])).
+
+
+node_down_with_partitions_test() ->
+    [S1, S2] = [
+        mk_shard("n1", [0, 4]),
+        mk_shard("n2", [0, 8])
+    ],
+    [W1, W2] = [
+        S1#shard{ref = make_ref()},
+        S2#shard{ref = make_ref()}
+    ],
+    Counters1 = fabric_dict:init([W1, W2], nil),
+    RingOpts = [{any, [S1, S2]}],
+
+    N1 = S1#shard.node,
+    Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
+    Res1 =  handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts),
+    ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+    {ok, Counters2} = Res1,
+    N2 = S2#shard.node,
+    Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
+    Res2 =  handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts),
+    ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).
+
+
+worker_error_with_partitions_test() ->
+    [S1, S2] = [
+        mk_shard("n1", [0, 4]),
+        mk_shard("n2", [0, 8])],
+    [W1, W2] = [
+        S1#shard{ref = make_ref()},
+        S2#shard{ref = make_ref()}
+    ],
+    Counters1 = fabric_dict:init([W1, W2], nil),
+    RingOpts = [{any, [S1, S2]}],
+
+    Res1 = handle_error(bam, W1, Counters1, RingOpts),
+    ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+    {ok, Counters2} = Res1,
+    ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)).
+
+
+mk_shard(Name, Range) ->
+    Node = list_to_atom(Name),
+    BName = list_to_binary(Name),
+    #shard{name = BName, node = Node, range = Range}.
+
+-endif.
diff --git a/src/dreyfus/src/dreyfus_fabric_group1.erl b/src/dreyfus/src/dreyfus_fabric_group1.erl
index 2d530ca..bdae6f0 100644
--- a/src/dreyfus/src/dreyfus_fabric_group1.erl
+++ b/src/dreyfus/src/dreyfus_fabric_group1.erl
@@ -27,7 +27,8 @@
     top_groups,
     counters,
     start_args,
-    replacements
+    replacements,
+    ring_opts
 }).
 
 go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -39,6 +40,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
     DesignName = dreyfus_util:get_design_docid(DDoc),
     dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
     Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
     Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc,
          IndexName, dreyfus_util:export(QueryArgs)]),
     Replacements = fabric_view:get_shard_replacements(DbName, Workers),
@@ -50,7 +52,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
         top_groups = [],
         counters = Counters,
         start_args = [DDoc, IndexName, QueryArgs],
-        replacements = Replacements
+        replacements = Replacements,
+        ring_opts = RingOpts
     },
     try
         rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -89,7 +92,7 @@ handle_message(Error, Worker, State0) ->
     State = upgrade_state(State0),
     case dreyfus_fabric:handle_error_message(Error, Worker,
       State#state.counters, State#state.replacements,
-      group1, State#state.start_args) of
+      group1, State#state.start_args, State#state.ring_opts) of
         {ok, Counters} ->
             {ok, State#state{counters=Counters}};
         {new_refs, NewRefs, NewCounters, NewReplacements} ->
diff --git a/src/dreyfus/src/dreyfus_fabric_group2.erl b/src/dreyfus/src/dreyfus_fabric_group2.erl
index 1239f8b..8d864dd 100644
--- a/src/dreyfus/src/dreyfus_fabric_group2.erl
+++ b/src/dreyfus/src/dreyfus_fabric_group2.erl
@@ -29,7 +29,8 @@
     top_groups,
     counters,
     start_args,
-    replacements
+    replacements,
+    ring_opts
 }).
 
 go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -41,6 +42,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
     DesignName = dreyfus_util:get_design_docid(DDoc),
     dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
     Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
     Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2,
                           [DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
     Replacements = fabric_view:get_shard_replacements(DbName, Workers),
@@ -54,7 +56,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
         top_groups = [],
         counters = Counters,
         start_args = [DDoc, IndexName, QueryArgs],
-        replacements = Replacements
+        replacements = Replacements,
+        ring_opts = RingOpts
     },
     try
         rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -102,7 +105,7 @@ handle_message(Error, Worker, State0) ->
     State = upgrade_state(State0),
     case dreyfus_fabric:handle_error_message(Error, Worker,
       State#state.counters, State#state.replacements,
-      group2, State#state.start_args) of
+      group2, State#state.start_args, State#state.ring_opts) of
         {ok, Counters} ->
             {ok, State#state{counters=Counters}};
         {new_refs, NewRefs, NewCounters, NewReplacements} ->
diff --git a/src/dreyfus/src/dreyfus_fabric_info.erl b/src/dreyfus/src/dreyfus_fabric_info.erl
index 27eec80..e217bc0 100644
--- a/src/dreyfus/src/dreyfus_fabric_info.erl
+++ b/src/dreyfus/src/dreyfus_fabric_info.erl
@@ -49,7 +49,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Counters, Acc}) ->
 
 handle_message({rexi_EXIT, Reason}, Worker, {Counters, Acc}) ->
     NewCounters = fabric_dict:erase(Worker, Counters),
-    case fabric_view:is_progress_possible(NewCounters) of
+    case fabric_ring:is_progress_possible(NewCounters) of
     true ->
         {ok, {NewCounters, Acc}};
     false ->
@@ -74,7 +74,7 @@ handle_message({ok, Info}, Worker, {Counters, Acc}) ->
 
 handle_message({error, Reason}, Worker, {Counters, Acc}) ->
     NewCounters = fabric_dict:erase(Worker, Counters),
-    case fabric_view:is_progress_possible(NewCounters) of
+    case fabric_ring:is_progress_possible(NewCounters) of
     true ->
         {ok, {NewCounters, Acc}};
     false ->
@@ -82,7 +82,7 @@ handle_message({error, Reason}, Worker, {Counters, Acc}) ->
     end;
 handle_message({'EXIT', _}, Worker, {Counters, Acc}) ->
     NewCounters = fabric_dict:erase(Worker, Counters),
-    case fabric_view:is_progress_possible(NewCounters) of
+    case fabric_ring:is_progress_possible(NewCounters) of
     true ->
         {ok, {NewCounters, Acc}};
     false ->
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl
index acf7a83..c0ebde1 100644
--- a/src/dreyfus/src/dreyfus_fabric_search.erl
+++ b/src/dreyfus/src/dreyfus_fabric_search.erl
@@ -27,7 +27,8 @@
     top_docs,
     counters,
     start_args,
-    replacements
+    replacements,
+    ring_opts
 }).
 
 go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) ->
     DesignName = dreyfus_util:get_design_docid(DDoc),
     dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
     Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
     Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search,
                           [DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
     Counters = fabric_dict:init(Workers, nil),
-    go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters);
+    go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts);
 
 go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
     Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs)
@@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
     Shards = dreyfus_util:get_shards(DbName, QueryArgs),
     LiveNodes = [node() | nodes()],
     LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)],
+    RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards),
     Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards),
     Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) ->
         QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{
@@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
         end
     end, Bookmark1),
     Counters = fabric_dict:init(Counters0, nil),
+    WorkerShards = fabric_dict:fetch_keys(Counters),
+    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards),
     QueryArgs2 = QueryArgs#index_query_args{
         bookmark = Bookmark1
     },
-    go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1);
+    go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts);
 go(DbName, DDoc, IndexName, OldArgs) ->
     go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).
 
-go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
+go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) ->
     {Workers, _} = lists:unzip(Counters),
     #index_query_args{
         limit = Limit,
@@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
         top_docs = #top_docs{total_hits=0,hits=[]},
         counters = Counters,
         start_args = [DDoc, IndexName, QueryArgs],
-        replacements = Replacements
+        replacements = Replacements,
+        ring_opts = RingOpts
      },
     RexiMon = fabric_util:create_monitors(Workers),
     try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) ->
     State = upgrade_state(State0),
     case dreyfus_fabric:handle_error_message(Error, Worker,
       State#state.counters, State#state.replacements,
-      search, State#state.start_args) of
+      search, State#state.start_args, State#state.ring_opts) of
         {ok, Counters} ->
             {ok, State#state{counters=Counters}};
         {new_refs, NewRefs, NewCounters, NewReplacements} ->
diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl
index 0a83e87..05ecdb6 100644
--- a/src/dreyfus/src/dreyfus_util.erl
+++ b/src/dreyfus/src/dreyfus_util.erl
@@ -19,7 +19,7 @@
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
--export([get_shards/2, sort/2, upgrade/1, export/1, time/2]).
+-export([get_shards/2, get_ring_opts/2, sort/2, upgrade/1, export/1, time/2]).
 -export([in_black_list/1, in_black_list/3, maybe_deny_index/3]).
 -export([get_design_docid/1]).
 -export([
@@ -59,6 +59,15 @@ use_ushards(#index_query_args{stable=true}) ->
 use_ushards(#index_query_args{}) ->
     false.
 
+
+get_ring_opts(#index_query_args{partition = nil}, _Shards) ->
+    [];
+get_ring_opts(#index_query_args{}, Shards) ->
+    Shards1 = lists:map(fun(#shard{} = S) ->
+        S#shard{ref = undefined}
+    end, Shards),
+    [{any, Shards1}].
+
 -spec sort(Order :: relevance | [any()], [#sortable{}]) -> [#sortable{}].
 sort(Sort, List0) ->
     {List1, Stash} = stash_items(List0),
@@ -418,4 +427,15 @@ stash_test() ->
     Unstashed = hd(unstash_items(Stashed, Stash)),
     ?assertEqual(Unstashed#sortable.item, bar).
 
+
+ring_opts_test() ->
+    Shards = [#shard{name = foo, ref = make_ref()}],
+
+    QArgs1 = #index_query_args{partition = nil},
+    ?assertEqual([], get_ring_opts(QArgs1, Shards)),
+
+    QArgs2 = #index_query_args{partition = <<"x">>},
+    ?assertMatch([{any, [#shard{name = foo, ref = undefined}]}],
+        get_ring_opts(QArgs2, Shards)).
+
 -endif.
diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl
index 2978832..954c52d 100644
--- a/src/fabric/src/fabric_db_partition_info.erl
+++ b/src/fabric/src/fabric_db_partition_info.erl
@@ -17,15 +17,27 @@
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
+
+-record(acc, {
+    counters,
+    replies,
+    ring_opts
+}).
+
+
 go(DbName, Partition) ->
-    Shards = mem3:shards(DbName, <<Partition/binary, ":foo">>),
+    Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
     Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]),
     RexiMon = fabric_util:create_monitors(Shards),
     Fun = fun handle_message/3,
-    Acc0 = {fabric_dict:init(Workers, nil), []},
+    Acc0 = #acc{
+        counters = fabric_dict:init(Workers, nil),
+        replies = [],
+        ring_opts =  [{any, Shards}]
+    },
     try
         case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
-            {ok, Acc} -> {ok, Acc};
+            {ok, Res} -> {ok, Res};
             {timeout, {WorkersDict, _}} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
                     WorkersDict,
@@ -42,36 +54,39 @@ go(DbName, Partition) ->
         rexi_monitor:stop(RexiMon)
     end.
 
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
-    case fabric_util:remove_down_workers(Counters, NodeRef) of
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, #acc{} = Acc) ->
+    #acc{counters = Counters, ring_opts = RingOpts} = Acc,
+    case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
     {ok, NewCounters} ->
-        {ok, {NewCounters, Acc}};
+        {ok, Acc#acc{counters = NewCounters}};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
     end;
 
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, #acc{} = Acc) ->
+    #acc{counters = Counters, ring_opts = RingOpts} = Acc,
     NewCounters = fabric_dict:erase(Shard, Counters),
-    case fabric_ring:is_progress_possible(NewCounters) of
+    case fabric_ring:is_progress_possible(NewCounters, RingOpts) of
     true ->
-        {ok, {NewCounters, Acc}};
+        {ok, Acc#acc{counters = NewCounters}};
     false ->
         {error, Reason}
     end;
 
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
-    Acc2 = [Info | Acc],
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, #acc{} = Acc) ->
+    #acc{counters = Counters, replies = Replies} = Acc,
+    Replies1 = [Info | Replies],
     Counters1 = fabric_dict:erase(Shard, Counters),
     case fabric_dict:size(Counters1) =:= 0 of
         true ->
-            [FirstInfo | RestInfos] = Acc2,
+            [FirstInfo | RestInfos] = Replies1,
             PartitionInfo = get_max_partition_size(FirstInfo, RestInfos),
             {stop, [{db_name, Name} | format_partition(PartitionInfo)]};
         false ->
-            {ok, {Counters1, Acc2}}
+            {ok, Acc#acc{counters = Counters1, replies = Replies1}}
     end;
 
-handle_message(_, _, Acc) ->
+handle_message(_, _, #acc{} = Acc) ->
     {ok, Acc}.
 
 
@@ -97,3 +112,44 @@ format_partition(PartitionInfo) ->
     {value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo),
     [{sizes, {Size}} | PartitionInfo1].
 
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+node_down_test() ->
+    [S1, S2] = [mk_shard("n1", [0, 4]),  mk_shard("n2", [0, 8])],
+    Acc1 = #acc{
+        counters = fabric_dict:init([S1, S2], nil),
+        ring_opts = [{any, [S1, S2]}]
+    },
+
+    N1 = S1#shard.node,
+    {ok, Acc2} = handle_message({rexi_DOWN, nil, {nil, N1}, nil}, nil, Acc1),
+    ?assertEqual([{S2, nil}], Acc2#acc.counters),
+
+    N2 = S2#shard.node,
+    ?assertEqual({error, {nodedown, <<"progress not possible">>}},
+        handle_message({rexi_DOWN, nil, {nil, N2}, nil}, nil, Acc2)).
+
+
+worker_exit_test() ->
+    [S1, S2] = [mk_shard("n1", [0, 4]),  mk_shard("n2", [0, 8])],
+    Acc1 = #acc{
+        counters = fabric_dict:init([S1, S2], nil),
+        ring_opts = [{any, [S1, S2]}]
+    },
+
+    {ok, Acc2} = handle_message({rexi_EXIT, boom}, S1, Acc1),
+    ?assertEqual([{S2, nil}], Acc2#acc.counters),
+
+    ?assertEqual({error, bam}, handle_message({rexi_EXIT, bam}, S2, Acc2)).
+
+
+mk_shard(Name, Range) ->
+    Node = list_to_atom(Name),
+    BName = list_to_binary(Name),
+    #shard{name = BName, node = Node, range = Range}.
+
+-endif.
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index aaf0623..8aa14e7 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -14,7 +14,7 @@
 
 -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
-        remove_down_workers/2, doc_id_and_rev/1]).
+        remove_down_workers/2, remove_down_workers/3, doc_id_and_rev/1]).
 -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1]).
 -export([log_timeout/2, remove_done_workers/2]).
 -export([is_users_db/1, is_replicator_db/1]).
@@ -33,9 +33,12 @@
 -include_lib("eunit/include/eunit.hrl").
 
 remove_down_workers(Workers, BadNode) ->
+    remove_down_workers(Workers, BadNode, []).
+
+remove_down_workers(Workers, BadNode, RingOpts) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
-    case fabric_ring:is_progress_possible(NewWorkers) of
+    case fabric_ring:is_progress_possible(NewWorkers, RingOpts) of
     true ->
         {ok, NewWorkers};
     false ->
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 55b44e6..425f864 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -12,7 +12,7 @@
 
 -module(fabric_view).
 
--export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
+-export([remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
     check_down_shards/2, handle_worker_exit/3,
     get_shard_replacements/2, maybe_update_others/5]).
@@ -46,10 +46,6 @@ handle_worker_exit(Collector, _Worker, Reason) ->
     {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
     {error, Resp}.
 
-%% @doc looks for a fully covered keyrange in the list of counters
--spec is_progress_possible([{#shard{}, term()}]) -> boolean().
-is_progress_possible(Counters) ->
-    fabric_ring:is_progress_possible(Counters).
 
 -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
     [{#shard{}, any()}].
@@ -416,28 +412,6 @@ fix_skip_and_limit(#mrargs{} = Args) ->
 remove_finalizer(Args) ->
     couch_mrview_util:set_extra(Args, finalizer, null).
 
-% unit test
-is_progress_possible_test() ->
-    EndPoint = 2 bsl 31,
-    T1 = [[0, EndPoint-1]],
-    ?assertEqual(is_progress_possible(mk_cnts(T1)),true),
-    T2 = [[0,10],[11,20],[21,EndPoint-1]],
-    ?assertEqual(is_progress_possible(mk_cnts(T2)),true),
-    % gap
-    T3 = [[0,10],[12,EndPoint-1]],
-    ?assertEqual(is_progress_possible(mk_cnts(T3)),false),
-    % outside range
-    T4 = [[1,10],[11,20],[21,EndPoint-1]],
-    ?assertEqual(is_progress_possible(mk_cnts(T4)),false),
-    % outside range
-    T5 = [[0,10],[11,20],[21,EndPoint]],
-    ?assertEqual(is_progress_possible(mk_cnts(T5)),false),
-    T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, EndPoint - 1]],
-    ?assertEqual(is_progress_possible(mk_cnts(T6)), true),
-    % not possible, overlap is not exact
-    T7 = [[0, 10], [13, 20], [21, EndPoint - 1], [9, 12]],
-    ?assertEqual(is_progress_possible(mk_cnts(T7)), false).
-
 
 remove_overlapping_shards_test() ->
     Cb = undefined,
@@ -482,10 +456,6 @@ get_shard_replacements_test() ->
     ?assertEqual(Expect, Res).
 
 
-mk_cnts(Ranges) ->
-    Shards = lists:map(fun mk_shard/1, Ranges),
-    orddict:from_list([{Shard,nil} || Shard <- Shards]).
-
 mk_cnts(Ranges, NoNodes) ->
     orddict:from_list([{Shard,nil}
                        || Shard <-
@@ -502,10 +472,6 @@ mk_shards(NoNodes,Range,Shards) ->
     mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]).
 
 
-mk_shard([B, E]) when is_integer(B), is_integer(E) ->
-    #shard{range = [B, E]}.
-
-
 mk_shard(Name, Range) ->
     Node = list_to_atom(Name),
     BName = list_to_binary(Name),