You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/05/17 18:02:40 UTC
svn commit: r775685 - /couchdb/trunk/src/couchdb/couch_rep.erl
Author: kocolosk
Date: Sun May 17 16:02:39 2009
New Revision: 775685
URL: http://svn.apache.org/viewvc?rev=775685&view=rev
Log:
replicator should never hang when attachment receiver dies
Modified:
couchdb/trunk/src/couchdb/couch_rep.erl
Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=775685&r1=775684&r2=775685&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Sun May 17 16:02:39 2009
@@ -211,13 +211,19 @@
case should_flush(lists:flatlength([Docs|Buffer])) of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
- {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, length(Docs2) -
- length(Errors)),
- {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
- {[], Ctxt};
+ try update_docs(Target, Docs2, [], replicated_changes) of
+ {ok, Errors} ->
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, length(Docs2) -
+ length(Errors)),
+ {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
+ {[], Ctxt}
+ catch
+ throw:attachment_write_failed ->
+ ?LOG_ERROR("attachment request failed during write to disk", []),
+ exit({internal_server_error, replication_link_failure})
+ end;
false ->
{[Docs | Buffer], Context}
end,
@@ -272,11 +278,17 @@
stats = Stats
} = State,
- {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes),
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
- length(Errors)),
+ try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of
+ {ok, Errors} ->
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
+ length(Errors))
+ catch
+ throw:attachment_write_failed ->
+ ?LOG_ERROR("attachment request failed during final write", []),
+ exit({internal_server_error, replication_link_failure})
+ end,
couch_task_status:update("Finishing"),
@@ -421,7 +433,10 @@
%% wait for headers to ensure that we have a 200 status code
%% this is where we follow redirects etc
Pid ! {self(), gimme_status},
- receive {Pid, {status, StreamStatus, StreamHeaders}} ->
+ receive
+ {'EXIT', Pid, attachment_request_failed} ->
+ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries-1);
+ {Pid, {status, StreamStatus, StreamHeaders}} ->
?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
[StreamStatus, StreamHeaders]),
@@ -435,7 +450,12 @@
%% be the one to actually receive the ibrowse data.
{ok, fun() ->
Pid ! {self(), gimme_data},
- receive {Pid, Data} -> Data end
+ receive
+ {Pid, Data} ->
+ Data;
+ {'EXIT', Pid, attachment_request_failed} ->
+ throw(attachment_write_failed)
+ end
end};
ResponseCode >= 300, ResponseCode < 400 ->
% follow the redirect