You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2010/06/16 19:24:27 UTC

svn commit: r955312 - /couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl

Author: kocolosk
Date: Wed Jun 16 17:24:27 2010
New Revision: 955312

URL: http://svn.apache.org/viewvc?rev=955312&view=rev
Log:
Backport fix #2 from COUCHDB-793. Thanks Filipe and Paul Bonser.

Modified:
    couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl

Modified: couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl?rev=955312&r1=955311&r2=955312&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl Wed Jun 16 17:24:27 2010
@@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=He
         )
     ),
     Boundary = couch_uuids:random(),
-    {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
+    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
         Boundary, JsonBytes, Atts, true
     ),
-    {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
-    _StreamerPid = spawn_link(
-        fun() ->
-            couch_doc:doc_to_multi_part_stream(
-                Boundary,
-                JsonBytes,
-                Atts,
-                fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
-                true
-            ),
-            couch_work_queue:close(DataQueue)
-        end
+    StreamerPid = spawn_link(
+        fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
     ),
     BodyFun = fun(Acc) ->
+        DataQueue = case Acc of
+        nil ->
+            StreamerPid ! {start, self()},
+            receive
+            {queue, Q} ->
+                Q
+            end;
+        Queue ->
+            Queue
+        end,
         case couch_work_queue:dequeue(DataQueue) of
         closed ->
             eof;
         {ok, Data} ->
-            {ok, iolist_to_binary(Data), Acc}
+            {ok, iolist_to_binary(Data), DataQueue}
         end
     end,
     Request = Db#http_db{
         resource = couch_util:url_encode(Doc#doc.id),
         method = put,
         qs = [{new_edits, false}],
-        body = {BodyFun, ok},
+        body = {BodyFun, nil},
         headers = [
             {"x-couch-full-commit", "false"},
-            {"Content-Type",
-                "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+            {"Content-Type", ?b2l(ContentType)},
             {"Content-Length", Len} | Headers
         ]
     },
-    case couch_rep_httpc:request(Request) of
+    Result = case couch_rep_httpc:request(Request) of
     {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
         {Pos, [RevId | _]} = Doc#doc.revs,
         ErrId = couch_util:to_existing_atom(Error),
         [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
     _ ->
         []
+    end,
+    StreamerPid ! stop,
+    Result.
+
+streamer_fun(Boundary, JsonBytes, Atts) ->
+    receive
+    stop ->
+        ok;
+    {start, From} ->
+        % better use a brand new queue, to ensure there's no garbage from
+        % a previous (failed) iteration
+        {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
+        From ! {queue, DataQueue},
+        couch_doc:doc_to_multi_part_stream(
+            Boundary,
+            JsonBytes,
+            Atts,
+            fun(Data) ->
+                couch_work_queue:queue(DataQueue, Data)
+            end,
+            true
+        ),
+        couch_work_queue:close(DataQueue),
+        streamer_fun(Boundary, JsonBytes, Atts)
     end.
 
 write_docs_1({Props}) ->