You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/05/31 22:19:41 UTC
svn commit: r1129900 - in /couchdb/trunk/src/couchdb: couch_api_wrap.erl
couch_replicator_doc_copier.erl
Author: fdmanana
Date: Tue May 31 20:19:40 2011
New Revision: 1129900
URL: http://svn.apache.org/viewvc?rev=1129900&view=rev
Log:
Skip multipart attachments if doc is rejected by the target
If the target rejects the document (via validate_doc_update functions),
the connection is hold for a long time because the attachments are not
streamed.
Modified:
couchdb/trunk/src/couchdb/couch_api_wrap.erl
couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
Modified: couchdb/trunk/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.erl?rev=1129900&r1=1129899&r2=1129900&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.erl Tue May 31 20:19:40 2011
@@ -179,7 +179,7 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
end;
open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
{ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
- {ok, lists:foldl(Fun, Acc, Results)}.
+ {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
open_doc(#httpdb{} = Db, Id, Options) ->
@@ -475,20 +475,25 @@ receive_docs(Streamer, UserFun, UserAcc)
{"multipart/related", _} = ContentType ->
case doc_from_multi_part_stream(
ContentType, fun() -> receive_doc_data(Streamer) end) of
- {ok, Doc} ->
- UserAcc2 = UserFun({ok, Doc}, UserAcc),
+ {ok, Doc, Parser} ->
+ case UserFun({ok, Doc}, UserAcc) of
+ {ok, UserAcc2} ->
+ ok;
+ {skip, UserAcc2} ->
+ skip_multipart_atts(Parser)
+ end,
receive_docs(Streamer, UserFun, UserAcc2)
end;
{"application/json", []} ->
Doc = couch_doc:from_json_obj(
?JSON_DECODE(receive_all(Streamer, []))),
- UserAcc2 = UserFun({ok, Doc}, UserAcc),
+ {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
receive_docs(Streamer, UserFun, UserAcc2);
{"application/json", [{"error","true"}]} ->
{ErrorProps} = ?JSON_DECODE(receive_all(Streamer, [])),
Rev = get_value(<<"missing">>, ErrorProps),
Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
- UserAcc2 = UserFun(Result, UserAcc),
+ {_, UserAcc2} = UserFun(Result, UserAcc),
receive_docs(Streamer, UserFun, UserAcc2)
end;
done ->
@@ -496,6 +501,24 @@ receive_docs(Streamer, UserFun, UserAcc)
end.
+skip_multipart_atts(Parser) ->
+ skip_multipart_atts(Parser, erlang:monitor(process, Parser)).
+
+skip_multipart_atts(Parser, MonRef) ->
+ case is_process_alive(Parser) of
+ true ->
+ Parser ! {get_bytes, self()},
+ receive
+ {bytes, Bytes} ->
+ skip_multipart_atts(Parser, MonRef);
+ {'DOWN', MonRef, _, _, _} ->
+ ok
+ end;
+ false ->
+ erlang:demonitor(MonRef, [flush])
+ end.
+
+
restart_remote_open_doc_revs() ->
receive
{body_bytes, _} ->
@@ -601,7 +624,7 @@ doc_from_multi_part_stream(ContentType,
(A) ->
A
end, Doc#doc.atts),
- {ok, Doc#doc{atts = Atts2}}
+ {ok, Doc#doc{atts = Atts2}, Parser}
end.
Modified: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1129900&r1=1129899&r2=1129900&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Tue May 31 20:19:40 2011
@@ -350,7 +350,7 @@ fetch_doc(Source, {Id, Revs, PAs, _Seq},
local_doc_handler({ok, Doc}, {Target, DocList, W, F}) ->
case batch_doc(Doc) of
true ->
- {Target, [Doc | DocList], W, F};
+ {ok, {Target, [Doc | DocList], W, F}};
false ->
?LOG_DEBUG("Worker flushing doc with attachments", []),
Target2 = open_db(Target),
@@ -358,18 +358,18 @@ local_doc_handler({ok, Doc}, {Target, Do
close_db(Target2),
case Success of
true ->
- {Target, DocList, W + 1, F};
+ {ok, {Target, DocList, W + 1, F}};
false ->
- {Target, DocList, W, F + 1}
+ {ok, {Target, DocList, W, F + 1}}
end
end;
local_doc_handler(_, Acc) ->
- Acc.
+ {ok, Acc}.
remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
- Acc;
+ {ok, Acc};
remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
% Immediately flush documents with attachments received from a remote
% source. The data property of each attachment is a function that starts
@@ -380,9 +380,14 @@ remote_doc_handler({ok, Doc}, {Parent, T
Success = (flush_doc(Target2, Doc) =:= ok),
ok = gen_server:call(Parent, {doc_flushed, Success}, infinity),
close_db(Target2),
- Acc;
+ case Success of
+ true ->
+ {ok, Acc};
+ false ->
+ {skip, Acc}
+ end;
remote_doc_handler(_, Acc) ->
- Acc.
+ {ok, Acc}.
spawn_writer(Target, #batch{docs = DocList, size = Size}) ->