You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/15 20:15:36 UTC

svn commit: r1023054 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_api_wrap_httpc.erl

Author: fdmanana
Date: Fri Oct 15 18:15:35 2010
New Revision: 1023054

URL: http://svn.apache.org/viewvc?rev=1023054&view=rev
Log:
New replicator: small improvements for error handling.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1023054&r1=1023053&r2=1023054&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Fri Oct 15 18:15:35 2010
@@ -164,11 +164,12 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
     _ ->
         ?JSON_ENCODE(couch_doc:revs_to_strs(Revs))
     end,
-    Self = self(),
     QArgs = [
         {"revs", "true"}, {"open_revs", RevStr} |
         options_to_query_args(Options, [])
     ],
+    TrapExit = element(2, erlang:process_info(self(), trap_exit)),
+    process_flag(trap_exit, true),
     Streamer = spawn_link(fun() ->
             send_req(
                 HttpDb,
@@ -180,10 +181,13 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
                         get_value("Content-Type", Headers),
                         StreamDataFun,
                         fun(Ev) -> mp_parse_mixed(Ev) end)
-                end),
-            unlink(Self)
+                end)
         end),
-    receive_docs(Streamer, Fun, Acc);
+    Result = receive_docs(Streamer, Fun, Acc),
+    unlink(Streamer),
+    receive {'EXIT', Streamer, _} -> ok after 0 -> ok end,
+    process_flag(trap_exit, TrapExit),
+    Result;
 open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
     {ok, lists:foldl(Fun, Acc, Results)}.
@@ -234,29 +238,26 @@ update_doc(#httpdb{} = HttpDb, #doc{id =
     false ->
         []
     end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
-    Self = self(),
-    Ref = make_ref(),
+    TrapExit = element(2, erlang:process_info(self(), trap_exit)),
+    process_flag(trap_exit, true),
     DocStreamer = spawn_link(fun() ->
         couch_doc:doc_to_multi_part_stream(
             Boundary, JsonBytes, Doc#doc.atts,
             fun(Data) ->
-                receive {get_data, Ref, From} ->
-                    From ! {data, Ref, Data}
+                receive {get_data, From} ->
+                    From ! {data, Data}
                 end
-            end, false),
-        unlink(Self)
+            end, false)
     end),
     SendFun = fun(0) ->
             eof;
         (LenLeft) when LenLeft > 0 ->
-            DocStreamer ! {get_data, Ref, self()},
-            receive {data, Ref, Data} ->
+            DocStreamer ! {get_data, self()},
+            receive {data, Data} ->
                 {ok, Data, LenLeft - iolist_size(Data)}
-            after HttpDb#httpdb.timeout ->
-                http_request_failed
             end
     end,
-    send_req(
+    Result = send_req(
         HttpDb,
         [{method, put}, {path, encode_doc_id(DocId)},
             {qs, QArgs}, {headers, Headers}, {body, {SendFun, Len}}],
@@ -264,7 +265,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id =
                 {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
             (_, _, {Props}) ->
                 {error, get_value(<<"error">>, Props)}
-        end);
+        end),
+    process_flag(trap_exit, TrapExit),
+    unlink(DocStreamer),
+    receive {'EXIT', DocStreamer, _} -> ok after 0 -> ok end,
+    Result;
 update_doc(Db, Doc, Options, Type) ->
     try
         couch_db:update_doc(Db, Doc, Options, Type)

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1023054&r1=1023053&r2=1023054&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Fri Oct 15 18:15:35 2010
@@ -36,7 +36,7 @@ send_req(#httpdb{headers = BaseHeaders} 
     end,
     IbrowseOptions = [
         {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout},
-        {socket_options, [{reuseaddr, true}]}
+        {socket_options, [{reuseaddr, true}, {keepalive, true}]}
         | get_value(ibrowse_options, Params, []) ++ HttpDb#httpdb.proxy_options
     ],
     Headers2 = oauth_header(HttpDb, Params) ++ BaseHeaders ++ Headers,
@@ -64,12 +64,12 @@ process_response(Resp, Worker, HttpDb, P
             end,
             Callback(Ok, Headers, EJson);
         R when R =:= 301 ; R =:= 302 ->
-            do_redirect(Headers, HttpDb, Params, Callback);
+            do_redirect(Worker, Headers, HttpDb, Params, Callback);
         Error ->
             report_error(nil, HttpDb, Params, {code, Error})
         end;
-    {error, Reason} ->
-        report_error(nil, HttpDb, Params, {reason, Reason})
+    Error ->
+        report_error(nil, HttpDb, Params, {error, Error})
     end.
 
 
@@ -85,13 +85,12 @@ process_stream_response(ReqId, Worker, H
             stop_worker(Worker),
             Ret;
         R when R =:= 301 ; R =:= 302 ->
-            stop_worker(Worker),
-            do_redirect(Headers, HttpDb, Params, Callback);
+            do_redirect(Worker, Headers, HttpDb, Params, Callback);
         Error ->
             report_error(Worker, HttpDb, Params, {code, Error})
-        end
-    after HttpDb#httpdb.timeout ->
-        report_error(Worker, HttpDb, Params, timeout)
+        end;
+    {ibrowse_async_response, ReqId, {error, _} = Error} ->
+        report_error(Worker, HttpDb, Params, Error)
     end.
 
 
@@ -106,6 +105,9 @@ stop_worker(Worker) when is_pid(Worker) 
 report_error(Worker, #httpdb{timeout = Timeout} = HttpDb, Params, timeout) ->
     report_error(Worker, HttpDb, Params, {timeout, Timeout});
 
+report_error(Worker, #httpdb{timeout = T} = Db, Params, {error, req_timedout}) ->
+    report_error(Worker, Db, Params, {timeout, T});
+
 report_error(Worker, HttpDb, Params, Error) ->
     Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
     Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
@@ -114,9 +116,9 @@ report_error(Worker, HttpDb, Params, Err
     exit({http_request_failed, Method, Url, Error}).
 
 
-do_report_error(FullUrl, Method, {reason, Reason}) ->
+do_report_error(FullUrl, Method, {error, Error}) ->
     ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~p",
-        [Method, FullUrl, Reason]);
+        [Method, FullUrl, Error]);
 
 do_report_error(Url, Method, {code, Code}) ->
     ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
@@ -130,14 +132,14 @@ do_report_error(Url, Method, {timeout, T
 stream_data_self(HttpDb, Params, Worker, ReqId) ->
     ibrowse:stream_next(ReqId),
     receive
-    {ibrowse_async_response, ReqId, {error, Error}} ->
-        report_error(Worker, HttpDb, Params, {reason, Error});
+    {ibrowse_async_response, ReqId, {error, _} = Error} ->
+        report_error(Worker, HttpDb, Params, {error, Error});
     {ibrowse_async_response, ReqId, Data} ->
         {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end};
     {ibrowse_async_response_end, ReqId} ->
-        {<<>>, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId) end}
-    after HttpDb#httpdb.timeout ->
-        report_error(Worker, HttpDb, Params, timeout)
+        {<<>>, fun() ->
+            report_error(Worker, HttpDb, Params, {error, more_data_expected})
+        end}
     end.
 
 
@@ -177,10 +179,11 @@ oauth_header(#httpdb{url = BaseUrl, oaut
         "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
 
 
-do_redirect(RespHeaders, #httpdb{url = OrigUrl} = HttpDb, Params, Callback) ->
-    RedirectUrl = redirect_url(RespHeaders, OrigUrl),
+do_redirect(Worker, RespHeaders, #httpdb{url = Url} = HttpDb, Params, Cb) ->
+    stop_worker(Worker),
+    RedirectUrl = redirect_url(RespHeaders, Url),
     {HttpDb2, Params2} = after_redirect(RedirectUrl, HttpDb, Params),
-    send_req(HttpDb2, Params2, Callback).
+    send_req(HttpDb2, Params2, Cb).
 
 
 redirect_url(RespHeaders, OrigUrl) ->