You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/15 20:15:36 UTC
svn commit: r1023054 - in /couchdb/branches/new_replicator/src/couchdb:
couch_api_wrap.erl couch_api_wrap_httpc.erl
Author: fdmanana
Date: Fri Oct 15 18:15:35 2010
New Revision: 1023054
URL: http://svn.apache.org/viewvc?rev=1023054&view=rev
Log:
New replicator: small improvements for error handling.
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1023054&r1=1023053&r2=1023054&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Fri Oct 15 18:15:35 2010
@@ -164,11 +164,12 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
_ ->
?JSON_ENCODE(couch_doc:revs_to_strs(Revs))
end,
- Self = self(),
QArgs = [
{"revs", "true"}, {"open_revs", RevStr} |
options_to_query_args(Options, [])
],
+ TrapExit = element(2, erlang:process_info(self(), trap_exit)),
+ process_flag(trap_exit, true),
Streamer = spawn_link(fun() ->
send_req(
HttpDb,
@@ -180,10 +181,13 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
get_value("Content-Type", Headers),
StreamDataFun,
fun(Ev) -> mp_parse_mixed(Ev) end)
- end),
- unlink(Self)
+ end)
end),
- receive_docs(Streamer, Fun, Acc);
+ Result = receive_docs(Streamer, Fun, Acc),
+ unlink(Streamer),
+ receive {'EXIT', Streamer, _} -> ok after 0 -> ok end,
+ process_flag(trap_exit, TrapExit),
+ Result;
open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
{ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
{ok, lists:foldl(Fun, Acc, Results)}.
@@ -234,29 +238,26 @@ update_doc(#httpdb{} = HttpDb, #doc{id =
false ->
[]
end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
- Self = self(),
- Ref = make_ref(),
+ TrapExit = element(2, erlang:process_info(self(), trap_exit)),
+ process_flag(trap_exit, true),
DocStreamer = spawn_link(fun() ->
couch_doc:doc_to_multi_part_stream(
Boundary, JsonBytes, Doc#doc.atts,
fun(Data) ->
- receive {get_data, Ref, From} ->
- From ! {data, Ref, Data}
+ receive {get_data, From} ->
+ From ! {data, Data}
end
- end, false),
- unlink(Self)
+ end, false)
end),
SendFun = fun(0) ->
eof;
(LenLeft) when LenLeft > 0 ->
- DocStreamer ! {get_data, Ref, self()},
- receive {data, Ref, Data} ->
+ DocStreamer ! {get_data, self()},
+ receive {data, Data} ->
{ok, Data, LenLeft - iolist_size(Data)}
- after HttpDb#httpdb.timeout ->
- http_request_failed
end
end,
- send_req(
+ Result = send_req(
HttpDb,
[{method, put}, {path, encode_doc_id(DocId)},
{qs, QArgs}, {headers, Headers}, {body, {SendFun, Len}}],
@@ -264,7 +265,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id =
{ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
(_, _, {Props}) ->
{error, get_value(<<"error">>, Props)}
- end);
+ end),
+ process_flag(trap_exit, TrapExit),
+ unlink(DocStreamer),
+ receive {'EXIT', DocStreamer, _} -> ok after 0 -> ok end,
+ Result;
update_doc(Db, Doc, Options, Type) ->
try
couch_db:update_doc(Db, Doc, Options, Type)
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1023054&r1=1023053&r2=1023054&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Fri Oct 15 18:15:35 2010
@@ -36,7 +36,7 @@ send_req(#httpdb{headers = BaseHeaders}
end,
IbrowseOptions = [
{response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout},
- {socket_options, [{reuseaddr, true}]}
+ {socket_options, [{reuseaddr, true}, {keepalive, true}]}
| get_value(ibrowse_options, Params, []) ++ HttpDb#httpdb.proxy_options
],
Headers2 = oauth_header(HttpDb, Params) ++ BaseHeaders ++ Headers,
@@ -64,12 +64,12 @@ process_response(Resp, Worker, HttpDb, P
end,
Callback(Ok, Headers, EJson);
R when R =:= 301 ; R =:= 302 ->
- do_redirect(Headers, HttpDb, Params, Callback);
+ do_redirect(Worker, Headers, HttpDb, Params, Callback);
Error ->
report_error(nil, HttpDb, Params, {code, Error})
end;
- {error, Reason} ->
- report_error(nil, HttpDb, Params, {reason, Reason})
+ Error ->
+ report_error(nil, HttpDb, Params, {error, Error})
end.
@@ -85,13 +85,12 @@ process_stream_response(ReqId, Worker, H
stop_worker(Worker),
Ret;
R when R =:= 301 ; R =:= 302 ->
- stop_worker(Worker),
- do_redirect(Headers, HttpDb, Params, Callback);
+ do_redirect(Worker, Headers, HttpDb, Params, Callback);
Error ->
report_error(Worker, HttpDb, Params, {code, Error})
- end
- after HttpDb#httpdb.timeout ->
- report_error(Worker, HttpDb, Params, timeout)
+ end;
+ {ibrowse_async_response, ReqId, {error, _} = Error} ->
+ report_error(Worker, HttpDb, Params, Error)
end.
@@ -106,6 +105,9 @@ stop_worker(Worker) when is_pid(Worker)
report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) ->
report_error(Worker, HttpDb, Params, {timeout, Timeout});
+report_error(Worker, #httpdb{timeout = T} = Db, Params, {error, req_timedout}) ->
+ report_error(Worker, Db, Params, {timeout, T});
+
report_error(Worker, HttpDb, Params, Error) ->
Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
@@ -114,9 +116,9 @@ report_error(Worker, HttpDb, Params, Err
exit({http_request_failed, Method, Url, Error}).
-do_report_error(FullUrl, Method, {reason, Reason}) ->
+do_report_error(FullUrl, Method, {error, Error}) ->
?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~p",
- [Method, FullUrl, Reason]);
+ [Method, FullUrl, Error]);
do_report_error(Url, Method, {code, Code}) ->
?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
@@ -130,14 +132,14 @@ do_report_error(Url, Method, {timeout, T
stream_data_self(HttpDb, Params, Worker, ReqId) ->
ibrowse:stream_next(ReqId),
receive
- {ibrowse_async_response, ReqId, {error, Error}} ->
- report_error(Worker, HttpDb, Params, {reason, Error});
+ {ibrowse_async_response, ReqId, {error, _} = Error} ->
+ report_error(Worker, HttpDb, Params, {error, Error});
{ibrowse_async_response, ReqId, Data} ->
{Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end};
{ibrowse_async_response_end, ReqId} ->
- {<<>>, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end}
- after HttpDb#httpdb.timeout ->
- report_error(Worker, HttpDb, Params, timeout)
+ {<<>>, fun() ->
+ report_error(Worker, HttpDb, Params, {error, more_data_expected})
+ end}
end.
@@ -177,10 +179,11 @@ oauth_header(#httpdb{url = BaseUrl, oaut
"OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
-do_redirect(RespHeaders, #httpdb{url = OrigUrl} = HttpDb, Params, Callback) ->
- RedirectUrl = redirect_url(RespHeaders, OrigUrl),
+do_redirect(Worker, RespHeaders, #httpdb{url = Url} = HttpDb, Params, Cb) ->
+ stop_worker(Worker),
+ RedirectUrl = redirect_url(RespHeaders, Url),
{HttpDb2, Params2} = after_redirect(RedirectUrl, HttpDb, Params),
- send_req(HttpDb2, Params2, Callback).
+ send_req(HttpDb2, Params2, Cb).
redirect_url(RespHeaders, OrigUrl) ->