You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2010/06/16 19:24:27 UTC
svn commit: r955312 -
/couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl
Author: kocolosk
Date: Wed Jun 16 17:24:27 2010
New Revision: 955312
URL: http://svn.apache.org/viewvc?rev=955312&view=rev
Log:
Backport fix #2 from COUCHDB-793. Thanks Filipe and Paul Bonser.
Modified:
couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl
Modified: couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl?rev=955312&r1=955311&r2=955312&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl Wed Jun 16 17:24:27 2010
@@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=He
)
),
Boundary = couch_uuids:random(),
- {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
Boundary, JsonBytes, Atts, true
),
- {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
- _StreamerPid = spawn_link(
- fun() ->
- couch_doc:doc_to_multi_part_stream(
- Boundary,
- JsonBytes,
- Atts,
- fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
- true
- ),
- couch_work_queue:close(DataQueue)
- end
+ StreamerPid = spawn_link(
+ fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
),
BodyFun = fun(Acc) ->
+ DataQueue = case Acc of
+ nil ->
+ StreamerPid ! {start, self()},
+ receive
+ {queue, Q} ->
+ Q
+ end;
+ Queue ->
+ Queue
+ end,
case couch_work_queue:dequeue(DataQueue) of
closed ->
eof;
{ok, Data} ->
- {ok, iolist_to_binary(Data), Acc}
+ {ok, iolist_to_binary(Data), DataQueue}
end
end,
Request = Db#http_db{
resource = couch_util:url_encode(Doc#doc.id),
method = put,
qs = [{new_edits, false}],
- body = {BodyFun, ok},
+ body = {BodyFun, nil},
headers = [
{"x-couch-full-commit", "false"},
- {"Content-Type",
- "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+ {"Content-Type", ?b2l(ContentType)},
{"Content-Length", Len} | Headers
]
},
- case couch_rep_httpc:request(Request) of
+ Result = case couch_rep_httpc:request(Request) of
{[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
{Pos, [RevId | _]} = Doc#doc.revs,
ErrId = couch_util:to_existing_atom(Error),
[{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
_ ->
[]
+ end,
+ StreamerPid ! stop,
+ Result.
+
+streamer_fun(Boundary, JsonBytes, Atts) ->
+ receive
+ stop ->
+ ok;
+ {start, From} ->
+ % better use a brand new queue, to ensure there's no garbage from
+ % a previous (failed) iteration
+ {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
+ From ! {queue, DataQueue},
+ couch_doc:doc_to_multi_part_stream(
+ Boundary,
+ JsonBytes,
+ Atts,
+ fun(Data) ->
+ couch_work_queue:queue(DataQueue, Data)
+ end,
+ true
+ ),
+ couch_work_queue:close(DataQueue),
+ streamer_fun(Boundary, JsonBytes, Atts)
end.
write_docs_1({Props}) ->