You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/07/31 17:30:50 UTC

[couchdb] branch master updated: Stop couch_index processes on ddoc update

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e7ca45  Stop couch_index processes on ddoc update
2e7ca45 is described below

commit 2e7ca45b364467f164d2ecbe8846878234d36a47
Author: Mayya Sharipova <ma...@ca.ibm.com>
AuthorDate: Mon Jun 12 09:23:23 2017 -0400

    Stop couch_index processes on ddoc update
    
    Currently when ddoc is updated, couch_index and couch_index_updater processes
    corresponding to the previous version of ddoc will still exist until
    all indexing processing initiated by them is done.
    When ddoc of a big database is rapidly  modified, this puts a lot
    of unnecessary strain on database resources.
    
    With this change, when ddoc is updated:
    * all couch_index processes for the previous version of ddoc will be shutdown
    * all linked to them couch_index_updater processes will die as well
    * all processes waiting for indexing activity to be finished (waiters
        for couch_index:get_status) will receive an immediate reply:
        ddoc_updated. Interactive user requests (view queries) will get response:
        {404, <<"not_found">>, <<"Design document was updated or deleted.">>}
    
    Check if there are ddocs that use the same couch_index process
    before closing it on ddoc_updated
    
    1. When opening an index, always add a record {DbName, {DDocId, Sig}} to ?BY_DB.
    2. When ddoc_updated, check if there other ddocs in ?BY_DB with the same Sig.
        If there are no, stop couch_index processes.
        If there are other, only remove {DbName, {DDocId, Sig}}
        record from ?BY_DB for this ddoc.
---
 src/chttpd/src/chttpd.erl                          |   2 +
 src/couch_index/src/couch_index.erl                |  29 ++---
 src/couch_index/src/couch_index_server.erl         |  56 +++++++--
 src/couch_mrview/src/couch_mrview.erl              |  18 ++-
 src/couch_mrview/src/couch_mrview_index.erl        |  14 ++-
 src/couch_mrview/src/couch_mrview_util.erl         |  26 ++--
 .../test/couch_mrview_ddoc_updated_tests.erl       | 139 +++++++++++++++++++++
 src/fabric/src/fabric_rpc.erl                      |   8 +-
 src/fabric/src/fabric_util.erl                     |   3 +
 src/fabric/src/fabric_view_map.erl                 |   7 +-
 src/fabric/src/fabric_view_reduce.erl              |   7 +-
 11 files changed, 261 insertions(+), 48 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 9423fa9..cfefb78 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -846,6 +846,8 @@ error_info({not_found, Reason}) ->
     {404, <<"not_found">>, Reason};
 error_info({filter_fetch_error, Reason}) ->
     {404, <<"not_found">>, Reason};
+error_info(ddoc_updated) ->
+    {404, <<"not_found">>, <<"Design document was updated or deleted.">>};
 error_info({not_acceptable, Reason}) ->
     {406, <<"not_acceptable">>, Reason};
 error_info(conflict) ->
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
index 604d503..b3a800f 100644
--- a/src/couch_index/src/couch_index.erl
+++ b/src/couch_index/src/couch_index.erl
@@ -112,9 +112,16 @@ init({Mod, IdxState}) ->
     end.
 
 
-terminate(Reason, State) ->
+terminate(Reason0, State) ->
     #st{mod=Mod, idx_state=IdxState}=State,
-    Mod:close(IdxState),
+    case Reason0 of
+        {shutdown, ddoc_updated} ->
+            Mod:shutdown(IdxState),
+            Reason = ddoc_updated;
+        _ ->
+            Mod:close(IdxState),
+            Reason = Reason0
+    end,
     send_all(State#st.waiters, Reason),
     couch_util:shutdown_sync(State#st.updater),
     couch_util:shutdown_sync(State#st.compactor),
@@ -271,7 +278,7 @@ handle_cast(delete, State) ->
     ok = Mod:delete(IdxState),
     {stop, normal, State};
 handle_cast({ddoc_updated, DDocResult}, State) ->
-    #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+    #st{mod = Mod, idx_state = IdxState} = State,
     Shutdown = case DDocResult of
         {not_found, deleted} ->
             true;
@@ -284,17 +291,12 @@ handle_cast({ddoc_updated, DDocResult}, State) ->
     end,
     case Shutdown of
         true ->
-            case Waiters of
-                [] ->
-                    {stop, normal, State};
-                _ ->
-                    {noreply, State#st{shutdown = true}}
-            end;
+            {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
         false ->
             {noreply, State#st{shutdown = false}}
     end;
 handle_cast(ddoc_updated, State) ->
-    #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+    #st{mod = Mod, idx_state = IdxState} = State,
     DbName = Mod:get(db_name, IdxState),
     DDocId = Mod:get(idx_name, IdxState),
     Shutdown = couch_util:with_db(DbName, fun(Db) ->
@@ -308,12 +310,7 @@ handle_cast(ddoc_updated, State) ->
     end),
     case Shutdown of
         true ->
-            case Waiters of
-                [] ->
-                    {stop, normal, State};
-                _ ->
-                    {noreply, State#st{shutdown = true}}
-            end;
+            {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
         false ->
             {noreply, State#st{shutdown = false}}
     end;
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 92b8c8e..8225a90 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -106,6 +106,13 @@ get_index(Module, IdxState) ->
     Sig = Module:get(signature, IdxState),
     case ets:lookup(?BY_SIG, {DbName, Sig}) of
         [{_, Pid}] when is_pid(Pid) ->
+            DDocId = Module:get(idx_name, IdxState),
+            case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of
+                [] ->
+                    Args = [Pid, DbName, DDocId, Sig],
+                    gen_server:cast(?MODULE, {add_to_ets, Args});
+                _ -> ok
+            end,
             {ok, Pid};
         _ ->
             Args = {Module, IdxState, DbName, Sig},
@@ -161,14 +168,25 @@ handle_call({reset_indexes, DbName}, _From, State) ->
 
 handle_cast({reset_indexes, DbName}, State) ->
     reset_indexes(DbName, State#st.root_dir),
+    {noreply, State};
+handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
+    % check if Pid still exists
+    case ets:lookup(?BY_PID, Pid) of
+        [{Pid, {DbName, Sig}}] when is_pid(Pid) ->
+            ets:insert(?BY_DB, {DbName, {DDocId, Sig}});
+        _ -> ok
+    end,
+    {noreply, State};
+handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
+    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}),
     {noreply, State}.
 
 handle_info({'EXIT', Pid, Reason}, Server) ->
     case ets:lookup(?BY_PID, Pid) of
         [{Pid, {DbName, Sig}}] ->
-            [{DbName, {DDocId, Sig}}] =
-                ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
-            rem_from_ets(DbName, Sig, DDocId, Pid);
+            DDocIds = [DDocId || {_, {DDocId, _}}
+                <- ets:match_object(?BY_DB, {DbName, {'$1', Sig}})],
+            rem_from_ets(DbName, Sig, DDocIds, Pid);
         [] when Reason /= normal ->
             exit(Reason);
         _Else ->
@@ -221,14 +239,17 @@ new_index({Mod, IdxState, DbName, Sig}) ->
 
 reset_indexes(DbName, Root) ->
     % shutdown all the updaters and clear the files, the db got changed
-    Fun = fun({_, {DDocId, Sig}}) ->
+    SigDDocIds = lists:foldl(fun({_, {DDocId, Sig}}, DDict) ->
+        dict:append(Sig, DDocId, DDict)
+    end, dict:new(), ets:lookup(?BY_DB, DbName)),
+    Fun = fun({Sig, DDocIds}) ->
         [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
         MRef = erlang:monitor(process, Pid),
         gen_server:cast(Pid, delete),
         receive {'DOWN', MRef, _, _, _} -> ok end,
-        rem_from_ets(DbName, Sig, DDocId, Pid)
+        rem_from_ets(DbName, Sig, DDocIds, Pid)
     end,
-    lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+    lists:foreach(Fun, dict:to_list(SigDDocIds)),
     Path = couch_index_util:index_dir("", DbName),
     couch_file:nuke_dir(Root, Path).
 
@@ -239,10 +260,12 @@ add_to_ets(DbName, Sig, DDocId, Pid) ->
     ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
 
 
-rem_from_ets(DbName, Sig, DDocId, Pid) ->
+rem_from_ets(DbName, Sig, DDocIds, Pid) ->
     ets:delete(?BY_SIG, {DbName, Sig}),
     ets:delete(?BY_PID, Pid),
-    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
+    lists:foreach(fun(DDocId) ->
+        ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}})
+    end, DDocIds).
 
 
 handle_db_event(DbName, created, St) ->
@@ -259,10 +282,19 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated,
     DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
     lists:foreach(fun(DbShard) ->
         lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
-            case ets:lookup(?BY_SIG, {DbShard, Sig}) of
-                [{_, IndexPid}] -> (catch
-                    gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
-                [] -> []
+            % check if there are other ddocs with the same Sig for the same db
+            SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}),
+            if length(SigDDocs) > 1 ->
+                % remove records from ?BY_DB for this DDoc
+                Args = [DbShard, DDocId, Sig],
+                gen_server:cast(?MODULE, {rem_from_ets, Args});
+            true ->
+                % single DDoc with this Sig - close couch_index processes
+                case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+                    [{_, IndexPid}] -> (catch
+                        gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
+                    [] -> []
+                end
             end
         end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}))
     end, DbShards),
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 45dd83d..c44dd91 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -241,12 +241,16 @@ query_view(Db, DDoc, VName, Args) ->
 query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) ->
     query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc);
 query_view(Db, DDoc, VName, Args0, Callback, Acc0) ->
-    {ok, VInfo, Sig, Args} = couch_mrview_util:get_view(Db, DDoc, VName, Args0),
-    {ok, Acc1} = case Args#mrargs.preflight_fun of
-        PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
-        _ -> {ok, Acc0}
-    end,
-    query_view(Db, VInfo, Args, Callback, Acc1).
+    case couch_mrview_util:get_view(Db, DDoc, VName, Args0) of
+        {ok, VInfo, Sig, Args} ->
+            {ok, Acc1} = case Args#mrargs.preflight_fun of
+                PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
+                 _ -> {ok, Acc0}
+            end,
+            query_view(Db, VInfo, Args, Callback, Acc1);
+        ddoc_updated ->
+            Callback(ok, ddoc_updated)
+    end.
 
 
 get_view_index_pid(Db, DDoc, ViewName, Args0) ->
@@ -689,6 +693,8 @@ default_cb({final, Info}, []) ->
     {ok, [Info]};
 default_cb({final, _}, Acc) ->
     {ok, Acc};
+default_cb(ok, ddoc_updated) ->
+    {ok, ddoc_updated};
 default_cb(Row, Acc) ->
     {ok, [Row | Acc]}.
 
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index eaec5cc..aa1ee27 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -14,7 +14,7 @@
 
 
 -export([get/2]).
--export([init/2, open/2, close/1, reset/1, delete/1]).
+-export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]).
 -export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
 -export([compact/3, swap_compacted/2, remove_compacted/1]).
 -export([index_file_exists/1]).
@@ -143,6 +143,18 @@ close(State) ->
     couch_file:close(State#mrst.fd).
 
 
+% This called after ddoc_updated event occurrs, and
+% before we shutdown couch_index process.
+% We unlink couch_index from corresponding couch_file and demonitor it.
+% This allows all outstanding queries that are currently streaming
+% data from couch_file finish successfully.
+% couch_file will be closed automatically after all
+% outstanding queries are done.
+shutdown(State) ->
+    erlang:demonitor(State#mrst.fd_monitor, [flush]),
+    unlink(State#mrst.fd).
+
+
 delete(#mrst{db_name=DbName, sig=Sig}=State) ->
     couch_file:close(State#mrst.fd),
     catch couch_mrview_util:delete_files(DbName, Sig).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 5c95f2e..0d58e4f 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -42,13 +42,17 @@
 
 
 get_view(Db, DDoc, ViewName, Args0) ->
-    {ok, State, Args2} = get_view_index_state(Db, DDoc, ViewName, Args0),
-    Ref = erlang:monitor(process, State#mrst.fd),
-    #mrst{language=Lang, views=Views} = State,
-    {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
-    check_range(Args3, view_cmp(View)),
-    Sig = view_sig(Db, State, View, Args3),
-    {ok, {Type, View, Ref}, Sig, Args3}.
+    case get_view_index_state(Db, DDoc, ViewName, Args0) of
+        {ok, State, Args2} ->
+            Ref = erlang:monitor(process, State#mrst.fd),
+            #mrst{language=Lang, views=Views} = State,
+            {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
+            check_range(Args3, view_cmp(View)),
+            Sig = view_sig(Db, State, View, Args3),
+            {ok, {Type, View, Ref}, Sig, Args3};
+        ddoc_updated ->
+            ddoc_updated
+    end.
 
 
 get_view_index_pid(Db, DDoc, ViewName, Args0) ->
@@ -71,7 +75,7 @@ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) ->
         UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
             couch_db:get_update_seq(WDb)
         end),
-        {ok, State} = case Args#mrargs.update of
+        State = case Args#mrargs.update of
             lazy ->
                 spawn(fun() ->
                     catch couch_index:get_state(Pid, UpdateSeq)
@@ -82,7 +86,11 @@ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) ->
             _ ->
                 couch_index:get_state(Pid, UpdateSeq)
         end,
-        {ok, State, Args}
+        case State of
+            {ok, State0} -> {ok, State0, Args};
+            ddoc_updated -> ddoc_updated;
+            Else -> throw(Else)
+        end
     catch
         exit:{Reason, _} when Reason == noproc; Reason == normal ->
             timer:sleep(?GET_VIEW_RETRY_DELAY),
diff --git a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
new file mode 100644
index 0000000..d0ba6b4
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
@@ -0,0 +1,139 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_ddoc_updated_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    Name = ?tempdb(),
+    couch_server:delete(Name, [?ADMIN_CTX]),
+    {ok, Db} = couch_db:create(Name, [?ADMIN_CTX]),
+    DDoc = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"baz">>, {[
+                {<<"map">>, <<
+                    "function(doc) {\n"
+                    "    var i = 0; while(i<1000){ i++ };\n"
+                    "    emit(doc.val, doc.val);\n"
+                    "}"
+                >>}
+            ]}}
+        ]}}
+    ]}),
+    [Doc1 | Docs999] = couch_mrview_test_util:make_docs(map, 100),
+    {ok, _} = couch_db:update_docs(Db, [DDoc, Doc1], []),
+    {ok, Db2} = couch_db:reopen(Db),
+
+    % run a query with 1 doc to initialize couch_index process
+    CB = fun
+        ({row, _}, Count) -> {ok, Count+1};
+        (_, Count) -> {ok, Count}
+    end,
+    {ok, _} =
+        couch_mrview:query_view(Db2, <<"_design/bar">>, <<"baz">>, [], CB, 0),
+
+    % add more docs
+    {ok, _} = couch_db:update_docs(Db2, Docs999, []),
+    {ok, Db3} = couch_db:reopen(Db2),
+    Db3.
+
+teardown(Db) ->
+    couch_db:close(Db),
+    couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+    ok.
+
+
+ddoc_update_test_() ->
+    {
+        "Check ddoc update actions",
+        {
+            setup,
+            fun test_util:start_couch/0, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun check_indexing_stops_on_ddoc_change/1
+                ]
+            }
+        }
+    }.
+
+
+check_indexing_stops_on_ddoc_change(Db) ->
+    ?_test(begin
+        DDocID = <<"_design/bar">>,
+
+        IndexesBefore = get_indexes_by_ddoc(DDocID, 1),
+        ?assertEqual(1, length(IndexesBefore)),
+        AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
+        ?assertEqual(1, length(AliveBefore)),
+
+        {ok, DDoc} = couch_db:open_doc(Db, DDocID, [ejson_body, ?ADMIN_CTX]),
+        DDocJson2 = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"_deleted">>, true},
+            {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
+        ]}),
+
+        % spawn a process for query
+        Self = self(),
+        QPid = spawn(fun() ->
+            {ok, Result} = couch_mrview:query_view(
+                Db, <<"_design/bar">>, <<"baz">>, []),
+            Self ! {self(), Result}
+        end),
+
+        % while indexing for the query is in progress, delete DDoc
+        {ok, _} = couch_db:update_doc(Db, DDocJson2, []),
+        receive
+            {QPid, Msg} ->
+                ?assertEqual(Msg, ddoc_updated)
+        after ?TIMEOUT ->
+            erlang:error(
+                {assertion_failed, [{module, ?MODULE}, {line, ?LINE},
+                {reason, "test failed"}]})
+        end,
+
+        %% assert that previously running indexes are gone
+        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+        ?assertEqual(0, length(IndexesAfter)),
+        AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
+        ?assertEqual(0, length(AliveAfter))
+    end).
+
+
+get_indexes_by_ddoc(DDocID, N) ->
+    Indexes = test_util:wait(fun() ->
+        Indxs = ets:match_object(
+            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}),
+        case length(Indxs) == N of
+            true ->
+                Indxs;
+            false ->
+                wait
+        end
+    end),
+    lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) ->
+        case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+            [{_, Pid}] -> [Pid|Acc];
+            _ -> Acc
+        end
+    end, [], Indexes).
+
+
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 80b110a..93d7d15 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -314,7 +314,9 @@ view_cb({row, Row}, Acc) ->
 view_cb(complete, Acc) ->
     % Finish view output
     ok = rexi:stream_last(complete),
-    {ok, Acc}.
+    {ok, Acc};
+view_cb(ok, ddoc_updated) ->
+    rexi:reply({ok, ddoc_updated}).
 
 
 reduce_cb({meta, Meta}, Acc) ->
@@ -331,7 +333,9 @@ reduce_cb({row, Row}, Acc) ->
 reduce_cb(complete, Acc) ->
     % Finish view output
     ok = rexi:stream_last(complete),
-    {ok, Acc}.
+    {ok, Acc};
+reduce_cb(ok, ddoc_updated) ->
+    rexi:reply({ok, ddoc_updated}).
 
 
 changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 7e3f23e..7655613 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -132,6 +132,9 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
                 {stop, St#stream_acc{workers=Workers1}}
         end
     end;
+handle_stream_start({ok, ddoc_updated}, _, St) ->
+    cleanup(St#stream_acc.workers),
+    {stop, ddoc_updated};
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
 
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index b6cedb7..b6a3d6f 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -37,6 +37,8 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     RexiMon = fabric_util:create_monitors(Workers0),
     try
         case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+            {ok, ddoc_updated} ->
+                Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
                     go(DbName, Workers, VInfo, Args, Callback, Acc)
@@ -173,7 +175,10 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
 
 handle_message(complete, Worker, State) ->
     Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
-    fabric_view:maybe_send_row(State#collector{counters = Counters}).
+    fabric_view:maybe_send_row(State#collector{counters = Counters});
+
+handle_message(ddoc_updated, _Worker, State) ->
+    {stop, State}.
 
 merge_row(Dir, Collation, undefined, Row, Rows0) ->
     Rows1 = lists:merge(
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index e6146b0..a74be10 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -36,6 +36,8 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
     RexiMon = fabric_util:create_monitors(Workers0),
     try
         case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+            {ok, ddoc_updated} ->
+                Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
                     go2(DbName, Workers, VInfo, Args, Callback, Acc)
@@ -150,7 +152,10 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
 handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
     true = fabric_dict:is_key(Worker, Counters0),
     C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-    fabric_view:maybe_send_row(State#collector{counters = C1}).
+    fabric_view:maybe_send_row(State#collector{counters = C1});
+
+handle_message(ddoc_updated, _Worker, State) ->
+    {stop, State}.
 
 os_proc_needed(<<"_", _/binary>>) -> false;
 os_proc_needed(_) -> true.

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].