You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2010/06/12 18:35:38 UTC

svn commit: r954027 - in /couchdb/trunk/src/couchdb: couch_rep_att.erl couch_rep_reader.erl couch_rep_writer.erl

Author: kocolosk
Date: Sat Jun 12 16:35:37 2010
New Revision: 954027

URL: http://svn.apache.org/viewvc?rev=954027&view=rev
Log:
Fix hanging replication. COUCHDB-793. Thanks Filipe and Paul Bonser.

Modified:
    couchdb/trunk/src/couchdb/couch_rep_att.erl
    couchdb/trunk/src/couchdb/couch_rep_reader.erl
    couchdb/trunk/src/couchdb/couch_rep_writer.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_att.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_att.erl?rev=954027&r1=954026&r2=954027&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_att.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_att.erl Sat Jun 12 16:35:37 2010
@@ -54,7 +54,7 @@ attachment_receiver(Ref, Request) ->
         receive_data(Ref, ReqId, ContentEncoding)
     end
     catch
-    throw:{attachment_request_failed, timeout} ->
+    throw:{attachment_request_failed, _} ->
         case {Request#http_db.retries, Request#http_db.pause} of
         {0, _} ->
              ?LOG_INFO("request for ~p failed", [Request#http_db.resource]),

Modified: couchdb/trunk/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_reader.erl?rev=954027&r1=954026&r2=954027&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_reader.erl Sat Jun 12 16:35:37 2010
@@ -108,6 +108,8 @@ code_change(_OldVsn, State, _Extra) ->
 
 %internal funs
 
+handle_add_docs(_Seq, [], _From, State) ->
+    {reply, ok, State};
 handle_add_docs(Seq, DocsToAdd, From, #state{reply_to=nil} = State) ->
     State1 = update_sequence_lists(Seq, State),
     NewState = State1#state{
@@ -151,9 +153,13 @@ handle_open_remote_doc(Id, Seq, Revs, _,
     {_, _Ref} = spawn_document_request(Source, Id, Seq, Revs),
     {reply, ok, State#state{monitor_count = Count+1}}.
 
-handle_monitor_down(normal, #state{pending_doc_request=nil,
+handle_monitor_down(normal, #state{pending_doc_request=nil, reply_to=nil,
         monitor_count=1, complete=waiting_on_monitors} = State) ->
     {noreply, State#state{complete=true, monitor_count=0}};
+handle_monitor_down(normal, #state{pending_doc_request=nil, reply_to=From,
+        monitor_count=1, complete=waiting_on_monitors} = State) ->
+    gen_server:reply(From, {complete, calculate_new_high_seq(State)}),
+    {stop, normal, State#state{complete=true, monitor_count=0}};
 handle_monitor_down(normal, #state{pending_doc_request=nil} = State) ->
     #state{monitor_count = Count} = State,
     {noreply, State#state{monitor_count = Count-1}};

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=954027&r1=954026&r2=954027&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Sat Jun 12 16:35:37 2010
@@ -94,49 +94,72 @@ write_multi_part_doc(#http_db{headers=He
         )
     ),
     Boundary = couch_uuids:random(),
-    {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
+    {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
         Boundary, JsonBytes, Atts, true
     ),
-    {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
-    _StreamerPid = spawn_link(
-        fun() ->
-            couch_doc:doc_to_multi_part_stream(
-                Boundary,
-                JsonBytes,
-                Atts,
-                fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
-                true
-            ),
-            couch_work_queue:close(DataQueue)
-        end
+    StreamerPid = spawn_link(
+        fun() -> streamer_fun(Boundary, JsonBytes, Atts) end
     ),
     BodyFun = fun(Acc) ->
+        DataQueue = case Acc of
+        nil ->
+            StreamerPid ! {start, self()},
+            receive
+            {queue, Q} ->
+                Q
+            end;
+        Queue ->
+            Queue
+        end,
         case couch_work_queue:dequeue(DataQueue) of
         closed ->
             eof;
         {ok, Data} ->
-            {ok, iolist_to_binary(Data), Acc}
+            {ok, iolist_to_binary(Data), DataQueue}
         end
     end,
     Request = Db#http_db{
         resource = couch_util:url_encode(Doc#doc.id),
         method = put,
         qs = [{new_edits, false}],
-        body = {BodyFun, ok},
+        body = {BodyFun, nil},
         headers = [
             {"x-couch-full-commit", "false"},
-            {"Content-Type",
-                "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+            {"Content-Type", ?b2l(ContentType)},
             {"Content-Length", Len} | Headers
         ]
     },
-    case couch_rep_httpc:request(Request) of
+    Result = case couch_rep_httpc:request(Request) of
     {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
         {Pos, [RevId | _]} = Doc#doc.revs,
         ErrId = couch_util:to_existing_atom(Error),
         [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
     _ ->
         []
+    end,
+    StreamerPid ! stop,
+    Result.
+
+streamer_fun(Boundary, JsonBytes, Atts) ->
+    receive
+    stop ->
+        ok;
+    {start, From} ->
+        % better use a brand new queue, to ensure there's no garbage from
+        % a previous (failed) iteration
+        {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
+        From ! {queue, DataQueue},
+        couch_doc:doc_to_multi_part_stream(
+            Boundary,
+            JsonBytes,
+            Atts,
+            fun(Data) ->
+                couch_work_queue:queue(DataQueue, Data)
+            end,
+            true
+        ),
+        couch_work_queue:close(DataQueue),
+        streamer_fun(Boundary, JsonBytes, Atts)
     end.
 
 write_docs_1({Props}) ->