You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by to...@apache.org on 2016/10/05 17:31:48 UTC

[2/5] couch-replicator commit: updated refs/heads/master to cb41bac

Retry when connection_closed is received during a streamed response

The changes_reader uses a streamed response. During the stream, it's
possible to receive a connection_closed error due to timeouts or
network issues. We simply retry the request because for streamed
responses a connection must be established first in order for the
stream to begin. So if the network is truly down, the initial request
will fail and the code path will go through the normal retry clause
which decrements the number of retries. This way we won't be stuck
forever if it's an actual network issue.

BugzId: 70400
COUCHDB-3010


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/2db0d7f2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/2db0d7f2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/2db0d7f2

Branch: refs/heads/master
Commit: 2db0d7f2a697c7e4aae9ca0ccb1658591a7579c5
Parents: 80e9578
Author: Tony Sun <to...@cloudant.com>
Authored: Wed Jul 20 21:45:14 2016 -0700
Committer: Tony Sun <to...@cloudant.com>
Committed: Wed Sep 21 11:39:39 2016 -0700

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl       | 63 ++++++++++++++++------------
 src/couch_replicator_changes_reader.erl |  5 ++-
 src/couch_replicator_httpc.erl          |  3 ++
 3 files changed, 43 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2db0d7f2/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index ff6b00c..f22cac8 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -483,33 +483,42 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
         {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
     end,
-    send_req(
-        HttpDb,
-        [{method, Method}, {path, "_changes"}, {qs, QArgs},
-            {headers, Headers}, {body, Body},
-            {ibrowse_options, [{stream_to, {self(), once}}]}],
-        fun(200, _, DataStreamFun) ->
-                parse_changes_feed(Options, UserFun, DataStreamFun);
-            (405, _, _) when is_list(DocIds) ->
-                % CouchDB versions < 1.1.0 don't have the builtin _changes feed
-                % filter "_doc_ids" neither support POST
-                send_req(HttpDb, [{method, get}, {path, "_changes"},
-                    {qs, BaseQArgs}, {headers, Headers1},
-                    {ibrowse_options, [{stream_to, {self(), once}}]}],
-                    fun(200, _, DataStreamFun2) ->
-                        UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
-                            case lists:member(Id, DocIds) of
-                            true ->
-                                UserFun(DocInfo);
-                            false ->
-                                ok
-                            end;
-                        (LastSeq) ->
-                            UserFun(LastSeq)
-                        end,
-                        parse_changes_feed(Options, UserFun2, DataStreamFun2)
-                    end)
-        end);
+    try
+        send_req(
+            HttpDb,
+            [{method, Method}, {path, "_changes"}, {qs, QArgs},
+                {headers, Headers}, {body, Body},
+                {ibrowse_options, [{stream_to, {self(), once}}]}],
+            fun(200, _, DataStreamFun) ->
+                    parse_changes_feed(Options, UserFun, DataStreamFun);
+                (405, _, _) when is_list(DocIds) ->
+                    % CouchDB versions < 1.1.0 don't have the builtin
+                    % _changes feed filter "_doc_ids" neither support POST
+                    send_req(HttpDb, [{method, get}, {path, "_changes"},
+                        {qs, BaseQArgs}, {headers, Headers1},
+                        {ibrowse_options, [{stream_to, {self(), once}}]}],
+                        fun(200, _, DataStreamFun2) ->
+                            UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+                                case lists:member(Id, DocIds) of
+                                true ->
+                                    UserFun(DocInfo);
+                                false ->
+                                    ok
+                                end;
+                            (LastSeq) ->
+                                UserFun(LastSeq)
+                            end,
+                            parse_changes_feed(Options, UserFun2,
+                                DataStreamFun2)
+                        end)
+            end)
+    catch
+        exit:{http_request_failed, _, _, {error, {connection_closed,
+                mid_stream}}} ->
+            throw(retry_no_limit);
+        exit:{http_request_failed, _, _, _} = Error ->
+            throw({retry_limit, Error})
+    end;
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
     DocIds = get_value(doc_ids, Options),
     Selector = get_value(selector, Options),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2db0d7f2/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index b7d18e0..bed318a 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -52,7 +52,10 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
         throw:recurse ->
             LS = get(last_seq),
             read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
-        exit:{http_request_failed, _, _, _} = Error ->
+        throw:retry_no_limit ->
+            LS = get(last_seq),
+            read_changes(Parent, LS, Db, ChangesQueue, Options, Ts);
+        throw:{retry_limit, Error} ->
         couch_stats:increment_counter(
             [couch_replicator, changes_read_failures]
         ),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/2db0d7f2/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a4cca4..366c325 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -180,6 +180,9 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
                 Ret = Callback(Ok, Headers, StreamDataFun),
                 Ret
             catch
+                throw:{maybe_retry_req, connection_closed} ->
+                    maybe_retry({connection_closed, mid_stream},
+                        Worker, HttpDb, Params);
                 throw:{maybe_retry_req, Err} ->
                     maybe_retry(Err, Worker, HttpDb, Params)
             end;