You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by mi...@apache.org on 2015/06/02 19:29:16 UTC

[09/13] couch-replicator commit: updated refs/heads/2707-merge-couch_replicator-fixes-from-cloudant-fork to 367562b

Ensure that ibrowse streams are ended properly

I found a situation where we had live lock on a running application due
to an ibrowse request that hadn't been properly terminated. This
manifested as a cesation of updates to the _active_tasks information.
Debugging this lead me to see that the main couch_replicator pid was
stuck on a call to get_pending_changes. This call was stuck because the
ibrowse_http_client process being used was stuck waiting for a changes
request to complete.

This changes request as it turns out had been abandoned by the
couch_replicator_changes_reader. The changes reader was then stuck
trying to do a gen_server:call/2 back to the main couch_replicator
process with the report_seq_done message.

Given all this, it became apparent that the changes feed improperly
ending its ibrowse streams was the underlying culprit. Issuing a call to
ibrowse:stream_next/1 with the abandoned ibrowse stream id resulted in
the replication resuming.

This bug was introduced in this commit:
bfa020b43be20c54ab166c51f5c6e55c34d844c2

BugzId: 47306

This is a cherry-pick of:

https://github.com/cloudant/couch_replicator/commit/f9db37a9b293f5f078681e7539fd35a92eb3adec


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/89d316e5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/89d316e5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/89d316e5

Branch: refs/heads/2707-merge-couch_replicator-fixes-from-cloudant-fork
Commit: 89d316e51d49c74b6afe5fc5c7de64c77d299df2
Parents: 44e755e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed May 20 17:04:31 2015 -0500
Committer: Mike Wallace <mi...@apache.org>
Committed: Tue Jun 2 18:20:06 2015 +0100

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl | 29 ++++++++++++++++++++++-------
 1 file changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/89d316e5/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a6213a..e61fba5 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -27,6 +27,7 @@
 
 -define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
 -define(MAX_WAIT, 5 * 60 * 1000).
+-define(STREAM_STATUS, ibrowse_stream_status).
 
 
 setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
@@ -35,6 +36,7 @@ setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
 
 
 send_req(HttpDb, Params1, Callback) ->
+    put(STREAM_STATUS, init),
     couch_stats:increment_counter([couch_replicator, requests]),
     Params2 = ?replace(Params1, qs,
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -132,6 +134,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             StreamDataFun = fun() ->
                 stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
             end,
+            put(STREAM_STATUS, streaming),
             ibrowse:stream_next(ReqId),
             try
                 Ret = Callback(Ok, Headers, StreamDataFun),
@@ -167,13 +170,24 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
 % messages for the given ReqId on the floor since we're
 % no longer in the HTTP request.
 clean_mailbox({ibrowse_req_id, ReqId}) ->
-    receive
-    {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox({ibrowse_req_id, ReqId});
-    {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox({ibrowse_req_id, ReqId})
-    after 0 ->
-        ok
+    case get(STREAM_STATUS) of
+        streaming ->
+            ibrowse:stream_next(ReqId),
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId});
+                {ibrowse_async_response_end, ReqId} ->
+                    put(STREAM_STATUS, ended),
+                    ok
+            end;
+        _ ->
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId});
+                {ibrowse_async_response_end, ReqId} ->
+                    put(STREAM_STATUS, ended),
+                    ok
+            end
     end;
 clean_mailbox(_) ->
     ok.
@@ -222,6 +236,7 @@ stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
         ibrowse:stream_next(ReqId),
         {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
     {Data, ibrowse_async_response_end} ->
+        put(STREAM_STATUS, ended),
         {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
     end.