You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2013/10/02 18:16:18 UTC

[1/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Updated Branches:
  refs/heads/1901-atomic-multipart-retries [created] d9837e1da


Use latest=true when retrieving revisions

This allows the replicator to keep up with revisions that may have
occured after missing_revs was called.

BugzID: 14241


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/962ce087
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/962ce087
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/962ce087

Branch: refs/heads/1901-atomic-multipart-retries
Commit: 962ce087bfac80e2d721b679fee4fe1568a4d362
Parents: 82a103b
Author: Bob Dionne <bo...@cloudant.com>
Authored: Tue Aug 7 12:42:36 2012 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 11:59:02 2013 -0400

----------------------------------------------------------------------
 src/couch_replicator/src/couch_replicator_api_wrap.erl | 2 ++
 src/couch_replicator/src/couch_replicator_worker.erl   | 4 ++--
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/962ce087/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 90cfa8e..cd69e59 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -465,6 +465,8 @@ options_to_query_args([revs | Rest], Acc) ->
     options_to_query_args(Rest, [{"revs", "true"} | Acc]);
 options_to_query_args([{open_revs, all} | Rest], Acc) ->
     options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
+options_to_query_args([latest | Rest], Acc) ->
+    options_to_query_args(Rest, [{"latest", "true"} | Acc]);
 options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
     JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
     options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/962ce087/src/couch_replicator/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index e8a3570..d66d478 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -296,13 +296,13 @@ spawn_doc_reader(Source, Target, FetchParams) ->
 fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
     try
         couch_replicator_api_wrap:open_doc_revs(
-            Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc)
+            Source, Id, Revs, [{atts_since, PAs}, latest], DocHandler, Acc)
     catch
     throw:{missing_stub, _} ->
         ?LOG_ERROR("Retrying fetch and update of document `~s` due to out of "
             "sync attachment stubs. Missing revisions are: ~s",
             [Id, couch_doc:revs_to_strs(Revs)]),
-        couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [], DocHandler, Acc)
+        couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [latest], DocHandler, Acc)
     end.
 
 


[4/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Posted by ko...@apache.org.
Ensure the att reader learns of a Parser death


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/ba4e8382
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/ba4e8382
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/ba4e8382

Branch: refs/heads/1901-atomic-multipart-retries
Commit: ba4e8382a2318ea4aab9174f17a84eb0eb41d605
Parents: a3a2a5f
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Sep 12 13:30:14 2013 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 12:11:24 2013 -0400

----------------------------------------------------------------------
 src/couch_replicator/src/couch_replicator_api_wrap.erl | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/ba4e8382/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 28f96c0..d8be12a 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -713,6 +713,7 @@ doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
     {doc_bytes, Ref, DocBytes} ->
         Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
         ReadAttachmentDataFun = fun() ->
+            link(Parser),
             Parser ! {get_bytes, Ref, self()},
             receive
             {started_open_doc_revs, NewRef} ->


[7/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Posted by ko...@apache.org.
Handle open_revs retries at a higher level

This patch disables the httpc client retries for the request to stream
document revisions to the replicator.  The retry logic at that level
could end up jumbling together response body data from different
requests and thoroughly confusing the multipart parser.  Moving the
retry logic up a level allows us to start fresh each time.

BugzID: 21367


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/a3a2a5f6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/a3a2a5f6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/a3a2a5f6

Branch: refs/heads/1901-atomic-multipart-retries
Commit: a3a2a5f6082f2bda9185642a344fd28af9acb348
Parents: 8d6c59d
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Sep 10 22:26:14 2013 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 12:11:24 2013 -0400

----------------------------------------------------------------------
 .../src/couch_replicator_api_wrap.erl           | 98 +++++++++++++-------
 1 file changed, 67 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/a3a2a5f6/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 9921560..28f96c0 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -48,6 +48,7 @@
     get_value/3
     ]).
 
+-define(MAX_WAIT, 5 * 60 * 1000).
 
 db_uri(#httpdb{url = Url}) ->
     couch_util:url_strip_password(Url);
@@ -157,43 +158,78 @@ get_missing_revs(Db, IdRevs) ->
 
 
 
+open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
+    Path = encode_doc_id(Id),
+    QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+    Url = couch_util:url_strip_password(
+        couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+    ),
+    ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]),
+    exit(kaboom);
 open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
     Path = encode_doc_id(Id),
-    QArgs = options_to_query_args(
-        HttpDb, Path, [revs, {open_revs, Revs} | Options]),
-    Self = self(),
-    Streamer = spawn_link(fun() ->
-            send_req(
-                HttpDb,
-                [{path, Path}, {qs, QArgs},
-                    {ibrowse_options, [{stream_to, {self(), once}}]},
-                    {headers, [{"Accept", "multipart/mixed"}]}],
-                fun(200, Headers, StreamDataFun) ->
-                    remote_open_doc_revs_streamer_start(Self),
-                    {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
-                        get_value("Content-Type", Headers),
-                        StreamDataFun,
-                        fun mp_parse_mixed/1)
-                end),
-            unlink(Self)
+    QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+    {Pid, Ref} = spawn_monitor(fun() ->
+        Self = self(),
+        Callback = fun(200, Headers, StreamDataFun) ->
+            remote_open_doc_revs_streamer_start(Self),
+            {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+                get_value("Content-Type", Headers),
+                StreamDataFun,
+                fun mp_parse_mixed/1
+            )
+        end,
+        Streamer = spawn_link(fun() ->
+            Params = [
+                {path, Path},
+                {qs, QS},
+                {ibrowse_options, [{stream_to, {self(), once}}]},
+                {headers, [{"Accept", "multipart/mixed"}]}
+            ],
+            % We're setting retries to 0 here to avoid the case where the
+            % Streamer retries the request and ends up jumbling together two
+            % different response bodies.  Retries are handled explicitly by
+            % open_doc_revs itself.
+            send_req(HttpDb#httpdb{retries = 0}, Params, Callback)
+        end),
+        % If this process dies normally we can leave
+        % the Streamer process hanging around keeping an
+        % HTTP connection open. This is a bit of a
+        % hammer approach to making sure it releases
+        % that connection back to the pool.
+        spawn(fun() ->
+            Ref = erlang:monitor(process, Self),
+            receive
+                {'DOWN', Ref, process, Self, normal} ->
+                    exit(Streamer, {streamer_parent_died, Self});
+                {'DOWN', Ref, process, Self, _} ->
+                    ok
+                end
         end),
-    % If this process dies normally we can leave
-    % the Streamer process hanging around keeping an
-    % HTTP connection open. This is a bit of a
-    % hammer approach to making sure it releases
-    % that connection back to the pool.
-    spawn(fun() ->
-        Ref = erlang:monitor(process, Self),
         receive
-            {'DOWN', Ref, process, Self, normal} ->
-                exit(Streamer, {streamer_parent_died, Self});
-            {'DOWN', Ref, process, Self, _} ->
-                ok
-            end
+        {started_open_doc_revs, Ref} ->
+            Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc),
+            exit({exit_ok, Ret})
+        end
     end),
     receive
-    {started_open_doc_revs, Ref} ->
-        receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
+        {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+            Ret;
+        {'DOWN', Ref, process, Pid, Else} ->
+            Url = couch_util:url_strip_password(
+                couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+            ),
+            #httpdb{retries = Retries, wait = Wait0} = HttpDb,
+            Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
+            twig:log(notice,"Retrying GET to ~s in ~p seconds due to error ~s",
+                [Url, Wait / 1000, Else]
+            ),
+            ok = timer:sleep(Wait),
+            RetryDb = HttpDb#httpdb{
+                retries = Retries - 1,
+                wait = Wait
+            },
+            open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
     end;
 open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),


[5/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Posted by ko...@apache.org.
Disable automatic retries of streaming writes

An automatic retry makes no sense here, as we've already streamed part
of the response body from the GET.  Disabling the retries bubbles the
error up to run_user_fun inside open_doc_revs allows us to retry the
whole GET + PUT correctly.

BugzID: 20822


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/0b450507
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/0b450507
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/0b450507

Branch: refs/heads/1901-atomic-multipart-retries
Commit: 0b450507670a4649255c2ac4c41d6aa7ae81e45a
Parents: ba4e838
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Sep 12 17:13:07 2013 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 12:11:24 2013 -0400

----------------------------------------------------------------------
 src/couch_replicator/src/couch_replicator_api_wrap.erl | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/0b450507/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index d8be12a..fccb759 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -221,7 +221,7 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
             ),
             #httpdb{retries = Retries, wait = Wait0} = HttpDb,
             Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
-            twig:log(notice,"Retrying GET to ~s in ~p seconds due to error ~s",
+            twig:log(notice,"Retrying GET to ~s in ~p seconds due to error ~p",
                 [Url, Wait / 1000, Else]
             ),
             ok = timer:sleep(Wait),
@@ -278,7 +278,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
     end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
     Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
     send_req(
-        HttpDb,
+        % A crash here bubbles all the way back up to run_user_fun inside
+        % open_doc_revs, which will retry the whole thing.  That's the
+        % appropriate course of action, since we've already started streaming
+        % the response body from the GET request.
+        HttpDb#httpdb{retries = 0},
         [{method, put}, {path, encode_doc_id(DocId)},
             {qs, QArgs}, {headers, Headers}, {body, Body}],
         fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->


[2/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Posted by ko...@apache.org.
Avoid deadlocking the httpc pool

The multipart mime parsing code was getting into a state where it would
orphan parsers that held onto pool connections. This just adds a monitor
to make sure that the parser process dies when the parent dies normally
without finishing reading the rest of the parser.

BugzID: 16751


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/fa982654
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/fa982654
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/fa982654

Branch: refs/heads/1901-atomic-multipart-retries
Commit: fa982654b10a8d79ea934c5d9b5cb91ff7998e45
Parents: 962ce08
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 22 13:44:20 2013 -0600
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 11:59:20 2013 -0400

----------------------------------------------------------------------
 .../src/couch_replicator_api_wrap.erl                 | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/fa982654/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index cd69e59..4aabe15 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -177,6 +177,20 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
                 end),
             unlink(Self)
         end),
+    % If this process dies normally we can leave
+    % the Streamer process hanging around keeping an
+    % HTTP connection open. This is a bit of a
+    % hammer approach to making sure it releases
+    % that connection back to the pool.
+    spawn(fun() ->
+        Ref = erlang:monitor(process, Self),
+        receive
+            {'DOWN', Ref, process, Self, normal} ->
+                exit(Streamer, {streamer_parent_died, Self});
+            {'DOWN', Ref, process, Self, _} ->
+                ok
+            end
+    end),
     receive
     {started_open_doc_revs, Ref} ->
         receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)


[6/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Posted by ko...@apache.org.
Simplify doc_streamer initialization

The unlink at the end is a noop, so we can simplify this to use
spawn_link/3 and the fully-exported function call instead of
spawn_link/1 and a closure.

BugzID: 20822


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/d9837e1d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/d9837e1d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/d9837e1d

Branch: refs/heads/1901-atomic-multipart-retries
Commit: d9837e1dadb7b49a032084021b8aa01f299dda4e
Parents: 0b45050
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Sep 12 12:56:50 2013 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 12:11:24 2013 -0400

----------------------------------------------------------------------
 .../src/couch_replicator_api_wrap.erl           | 22 ++++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/d9837e1d/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index fccb759..2b8a636 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -834,6 +834,12 @@ rev_to_str({_Pos, _Id} = Rev) ->
 rev_to_str(Rev) ->
     Rev.
 
+write_fun() ->
+    fun(Data) ->
+        receive {get_data, Ref, From} ->
+            From ! {data, Ref, Data}
+        end
+    end.
 
 stream_doc({JsonBytes, Atts, Boundary, Len}) ->
     case erlang:erase({doc_streamer, Boundary}) of
@@ -843,17 +849,11 @@ stream_doc({JsonBytes, Atts, Boundary, Len}) ->
     _ ->
         ok
     end,
-    Self = self(),
-    DocStreamer = spawn_link(fun() ->
-        couch_doc:doc_to_multi_part_stream(
-            Boundary, JsonBytes, Atts,
-            fun(Data) ->
-                receive {get_data, Ref, From} ->
-                    From ! {data, Ref, Data}
-                end
-            end, true),
-        unlink(Self)
-    end),
+    DocStreamer = spawn_link(
+        couch_doc,
+        doc_to_multi_part_stream,
+        [Boundary, JsonBytes, Atts, write_fun(), true]
+    ),
     erlang:put({doc_streamer, Boundary}, DocStreamer),
     {ok, <<>>, {Len, Boundary}};
 stream_doc({0, Id}) ->


[3/7] git commit: updated refs/heads/1901-atomic-multipart-retries to d9837e1

Posted by ko...@apache.org.
Fix replication deadlock after HTTP retries

An HTTP failure in the open_doc_revs can lead to a replication deadlock
if the retry happens while parsing the multipart/mime response. The
UserFun callback can end up attempting to PUT a request using an
attachment reader from the failed request while the pid that invoked
open_doc_revs will never see the started_open_doc_revs message which
indicates a retry has happened.

This just spawns a middleman process so that the process that invoked
open_doc_revs can listen for the response from the user function as well
as the started_open_doc_revs message to handle retries appropriately.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/8d6c59d5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/8d6c59d5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/8d6c59d5

Branch: refs/heads/1901-atomic-multipart-retries
Commit: 8d6c59d55b097d8db23adef5077df4871e25c34d
Parents: fa98265
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Apr 2 18:05:30 2013 -0500
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Wed Oct 2 11:59:31 2013 -0400

----------------------------------------------------------------------
 .../src/couch_replicator_api_wrap.erl           | 36 ++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d6c59d5/src/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 4aabe15..9921560 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -535,7 +535,7 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
                 fun() -> receive_doc_data(Streamer, Ref) end,
                 Ref) of
             {ok, Doc, Parser} ->
-                case UserFun({ok, Doc}, UserAcc) of
+                case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of
                 {ok, UserAcc2} ->
                     ok;
                 {skip, UserAcc2} ->
@@ -546,13 +546,13 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
         {"application/json", []} ->
             Doc = couch_doc:from_json_obj(
                     ?JSON_DECODE(receive_all(Streamer, Ref, []))),
-            {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
+            {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref),
             receive_docs(Streamer, UserFun, Ref, UserAcc2);
         {"application/json", [{"error","true"}]} ->
             {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
             Rev = get_value(<<"missing">>, ErrorProps),
             Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
-            {_, UserAcc2} = UserFun(Result, UserAcc),
+            {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref),
             receive_docs(Streamer, UserFun, Ref, UserAcc2)
         end;
     {done, Ref} ->
@@ -560,6 +560,36 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
     end.
 
 
+run_user_fun(UserFun, Arg, UserAcc, OldRef) ->
+    {Pid, Ref} = spawn_monitor(fun() ->
+        try UserFun(Arg, UserAcc) of
+            Resp ->
+                exit({exit_ok, Resp})
+        catch
+            throw:Reason ->
+                exit({exit_throw, Reason});
+            error:Reason ->
+                exit({exit_error, Reason});
+            exit:Reason ->
+                exit({exit_exit, Reason})
+        end
+    end),
+    receive
+        {started_open_doc_revs, NewRef} ->
+            erlang:demonitor(Ref, [flush]),
+            exit(Pid, kill),
+            restart_remote_open_doc_revs(OldRef, NewRef);
+        {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+            Ret;
+        {'DOWN', Ref, process, Pid, {exit_throw, Reason}} ->
+            throw(Reason);
+        {'DOWN', Ref, process, Pid, {exit_error, Reason}} ->
+            erlang:error(Reason);
+        {'DOWN', Ref, process, Pid, {exit_exit, Reason}} ->
+            erlang:exit(Reason)
+    end.
+
+
 restart_remote_open_doc_revs(Ref, NewRef) ->
     receive
     {body_bytes, Ref, _} ->