You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2014/08/28 19:30:14 UTC

[14/50] [abbrv] couch-replicator commit: updated refs/heads/1963-eunit-bigcouch to 3cf0b13

Fix messages leaking from streaming HTTP responses

When we cancel a streaming HTTP response (ie, Transfer-Encoding:
chunked) its possible to leave ibrowse messages in the mailbox. For
processes like the changes reader that will continually retry HTTP
requests these messages can build up and cause the process to slow to a
crawl which in turns causes replications to seemingly get stuck.

This patch changes the HTTP request functions to be wholly contained in
a try/catch/after clause. Using the after clause we can be sure that
we're removing all ibrowse related messages before returning to the
calling function. We also release the worker from the same after clause
and inform the release if its a changes feed worker which is important
as these workers are not managed by a pool.

BugzId: 25341


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/1746e93d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/1746e93d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/1746e93d

Branch: refs/heads/1963-eunit-bigcouch
Commit: 1746e93de32ea273dc3cc211a41e0cf94e761dcc
Parents: 756f500
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Wed Nov 13 18:26:50 2013 -0800
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:05:23 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl | 98 +++++++++++++++++++++++--------------
 1 file changed, 62 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/1746e93d/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 6aeff93..0242885 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -39,8 +39,25 @@ send_req(HttpDb, Params1, Callback) ->
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
     Params = ?replace(Params2, ibrowse_options,
         lists:keysort(1, get_value(ibrowse_options, Params2, []))),
-    {Worker, Response} = send_ibrowse_req(HttpDb, Params),
-    process_response(Response, Worker, HttpDb, Params, Callback).
+    {Worker, Response, IsChanges} = send_ibrowse_req(HttpDb, Params),
+    Ret = try
+        process_response(Response, Worker, HttpDb, Params, Callback)
+    catch
+        throw:{retry, NewHttpDb0, NewParams0} ->
+            {retry, NewHttpDb0, NewParams0}
+    after
+        release_worker(Worker, HttpDb, IsChanges),
+        clean_mailbox(Response)
+    end,
+    % This is necessary to keep this tail-recursive. Calling
+    % send_req in the catch clause would turn it into a body
+    % recursive call accidentally.
+    case Ret of
+        {retry, #httpdb{}=NewHttpDb, NewParams} ->
+            send_req(NewHttpDb, NewParams, Callback);
+        _ ->
+            Ret
+    end.
 
 
 send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
@@ -50,10 +67,10 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
     Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
     Url = full_url(HttpDb, Params),
     Body = get_value(body, Params, []),
-    case get_value(path, Params) of
-    "_changes" ->
+    IsChanges = get_value(path, Params) == "_changes",
+    if IsChanges ->
         {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
-    _ ->
+    true ->
         {ok, Worker} = couch_replicator_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
     end,
     IbrowseOptions = [
@@ -67,22 +84,21 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
     end,
     Response = ibrowse:send_req_direct(
         Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
-    {Worker, Response}.
+    {Worker, Response, IsChanges}.
 
 
-process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
-    send_req(HttpDb, Params, Callback);
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, _Cb) ->
+    throw({retry, HttpDb, Params});
 
-process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+process_response({error, {'EXIT',{normal,_}}}, _Worker, HttpDb, Params, _Cb) ->
     % ibrowse worker terminated because remote peer closed the socket
     % -> not an error
-    send_req(HttpDb, Params, Cb);
+    throw({retry, HttpDb, Params});
 
 process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
     process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
 
 process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
-    release_worker(Worker, HttpDb),
     case list_to_integer(Code) of
     Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
         EJson = case Body of
@@ -95,11 +111,11 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
     R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
         do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
     Error ->
-        maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+        maybe_retry({code, Error}, Worker, HttpDb, Params)
     end;
 
-process_response(Error, Worker, HttpDb, Params, Callback) ->
-    maybe_retry(Error, Worker, HttpDb, Params, Callback).
+process_response(Error, Worker, HttpDb, Params, _Callback) ->
+    maybe_retry(Error, Worker, HttpDb, Params).
 
 
 process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
@@ -113,12 +129,10 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             ibrowse:stream_next(ReqId),
             try
                 Ret = Callback(Ok, Headers, StreamDataFun),
-                release_worker(Worker, HttpDb),
-                clean_mailbox_req(ReqId),
                 Ret
-            catch throw:{maybe_retry_req, Err} ->
-                clean_mailbox_req(ReqId),
-                maybe_retry(Err, Worker, HttpDb, Params, Callback)
+            catch
+                throw:{maybe_retry_req, Err} ->
+                    maybe_retry(Err, Worker, HttpDb, Params)
             end;
         R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
             do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
@@ -126,50 +140,63 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
             report_error(Worker, HttpDb, Params, {code, Error})
         end;
     {ibrowse_async_response, ReqId, {error, _} = Error} ->
-        maybe_retry(Error, Worker, HttpDb, Params, Callback)
+        maybe_retry(Error, Worker, HttpDb, Params)
     after HttpDb#httpdb.timeout + 500 ->
         % Note: ibrowse should always reply with timeouts, but this doesn't
         % seem to be always true when there's a very high rate of requests
         % and many open connections.
-        maybe_retry(timeout, Worker, HttpDb, Params, Callback)
+        maybe_retry(timeout, Worker, HttpDb, Params)
     end.
 
 
-clean_mailbox_req(ReqId) ->
+% Only streaming HTTP requests send messages back from
+% the ibrowse worker process. We can detect that based
+% on the ibrowse_req_id format. This just drops all
+% messages for the given ReqId on the floor since we're
+% no longer in the HTTP request.
+clean_mailbox({ibrowse_req_id, ReqId}) ->
     receive
     {ibrowse_async_response, ReqId, _} ->
-        clean_mailbox_req(ReqId);
+        clean_mailbox({ibrowse_req_id, ReqId});
     {ibrowse_async_response_end, ReqId} ->
-        clean_mailbox_req(ReqId)
+        clean_mailbox({ibrowse_req_id, ReqId})
     after 0 ->
         ok
-    end.
+    end;
+clean_mailbox(_) ->
+    ok.
 
 
-release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+release_worker(Worker, _, true) ->
+    true = unlink(Worker),
+    ibrowse_http_client:stop(Worker),
+    receive
+        {'EXIT', Worker, _} -> ok
+        after 0 -> ok
+    end;
+release_worker(Worker, #httpdb{httpc_pool = Pool}, false) ->
     ok = couch_replicator_httpc_pool:release_worker(Pool, Worker).
 
 
-maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
     report_error(Worker, HttpDb, Params, {error, Error});
 
-maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
-    Params, Cb) ->
-    release_worker(Worker, HttpDb),
+maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+    Params) ->
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     couch_log:notice("Retrying ~s request to ~s in ~p seconds due to error ~s",
         [Method, Url, Wait / 1000, error_cause(Error)]),
     ok = timer:sleep(Wait),
     Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
-    send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
+    NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
+    throw({retry, NewHttpDb, Params}).
 
 
-report_error(Worker, HttpDb, Params, Error) ->
+report_error(_Worker, HttpDb, Params, Error) ->
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
     do_report_error(Url, Method, Error),
-    release_worker(Worker, HttpDb),
     exit({http_request_failed, Method, Url, Error}).
 
 
@@ -255,11 +282,10 @@ oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
         "OAuth " ++ oauth:header_params_encode(OAuthParams)}].
 
 
-do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
-    release_worker(Worker, HttpDb),
+do_redirect(_Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, _Cb) ->
     RedirectUrl = redirect_url(Headers, Url),
     {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
-    send_req(HttpDb2, Params2, Cb).
+    throw({retry, HttpDb2, Params2}).
 
 
 redirect_url(RespHeaders, OrigUrl) ->