You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by mi...@apache.org on 2015/06/02 19:29:16 UTC
[09/13] couch-replicator commit: updated
refs/heads/2707-merge-couch_replicator-fixes-from-cloudant-fork to 367562b
Ensure that ibrowse streams are ended properly
I found a situation where we had live lock on a running application due
to an ibrowse request that hadn't been properly terminated. This
manifested as a cesation of updates to the _active_tasks information.
Debugging this lead me to see that the main couch_replicator pid was
stuck on a call to get_pending_changes. This call was stuck because the
ibrowse_http_client process being used was stuck waiting for a changes
request to complete.
This changes request as it turns out had been abandoned by the
couch_replicator_changes_reader. The changes reader was then stuck
trying to do a gen_server:call/2 back to the main couch_replicator
process with the report_seq_done message.
Given all this, it became apparent that the changes feed improperly
ending its ibrowse streams was the underlying culprit. Issuing a call to
ibrowse:stream_next/1 with the abandoned ibrowse stream id resulted in
the replication resuming.
This bug was introduced in this commit:
bfa020b43be20c54ab166c51f5c6e55c34d844c2
BugzId: 47306
This is a cherry-pick of:
https://github.com/cloudant/couch_replicator/commit/f9db37a9b293f5f078681e7539fd35a92eb3adec
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/89d316e5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/89d316e5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/89d316e5
Branch: refs/heads/2707-merge-couch_replicator-fixes-from-cloudant-fork
Commit: 89d316e51d49c74b6afe5fc5c7de64c77d299df2
Parents: 44e755e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed May 20 17:04:31 2015 -0500
Committer: Mike Wallace <mi...@apache.org>
Committed: Tue Jun 2 18:20:06 2015 +0100
----------------------------------------------------------------------
src/couch_replicator_httpc.erl | 29 ++++++++++++++++++++++-------
1 file changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/89d316e5/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 9a6213a..e61fba5 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -27,6 +27,7 @@
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-define(MAX_WAIT, 5 * 60 * 1000).
+-define(STREAM_STATUS, ibrowse_stream_status).
setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
@@ -35,6 +36,7 @@ setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
send_req(HttpDb, Params1, Callback) ->
+ put(STREAM_STATUS, init),
couch_stats:increment_counter([couch_replicator, requests]),
Params2 = ?replace(Params1, qs,
[{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
@@ -132,6 +134,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
StreamDataFun = fun() ->
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
end,
+ put(STREAM_STATUS, streaming),
ibrowse:stream_next(ReqId),
try
Ret = Callback(Ok, Headers, StreamDataFun),
@@ -167,13 +170,24 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
% messages for the given ReqId on the floor since we're
% no longer in the HTTP request.
clean_mailbox({ibrowse_req_id, ReqId}) ->
- receive
- {ibrowse_async_response, ReqId, _} ->
- clean_mailbox({ibrowse_req_id, ReqId});
- {ibrowse_async_response_end, ReqId} ->
- clean_mailbox({ibrowse_req_id, ReqId})
- after 0 ->
- ok
+ case get(STREAM_STATUS) of
+ streaming ->
+ ibrowse:stream_next(ReqId),
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox({ibrowse_req_id, ReqId});
+ {ibrowse_async_response_end, ReqId} ->
+ put(STREAM_STATUS, ended),
+ ok
+ end;
+ _ ->
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox({ibrowse_req_id, ReqId});
+ {ibrowse_async_response_end, ReqId} ->
+ put(STREAM_STATUS, ended),
+ ok
+ end
end;
clean_mailbox(_) ->
ok.
@@ -222,6 +236,7 @@ stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
ibrowse:stream_next(ReqId),
{Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
{Data, ibrowse_async_response_end} ->
+ put(STREAM_STATUS, ended),
{Data, fun() -> throw({maybe_retry_req, more_data_expected}) end}
end.