You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2008/07/16 22:55:14 UTC

svn commit: r677426 - /incubator/couchdb/trunk/src/couchdb/couch_rep.erl

Author: damien
Date: Wed Jul 16 13:55:14 2008
New Revision: 677426

URL: http://svn.apache.org/viewvc?rev=677426&view=rev
Log:
Fixed replication problems where read ad write queues can get backed up. With this fixed, throughput might be reduced.

Modified:
    incubator/couchdb/trunk/src/couchdb/couch_rep.erl

Modified: incubator/couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_rep.erl?rev=677426&r1=677425&r2=677426&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_rep.erl Wed Jul 16 13:55:14 2008
@@ -113,13 +113,15 @@
     end.
 
 pull_rep(DbTarget, DbSource, SourceSeqNum) ->
-    Parent = self(),
     SaveDocsPid = spawn_link(fun() ->
-            save_docs_loop(Parent, DbTarget, 0) end),
+            save_docs_loop(DbTarget, 0) end),
     OpenDocsPid = spawn_link(fun() ->
-            open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end),
+            open_doc_revs_loop(DbSource, SaveDocsPid, 0) end),
+    OpenDocsPid ! got_it, % prime queue with got_it
     MissingRevsPid = spawn_link(fun() ->
-            get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end),
+            get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end),
+    MissingRevsPid ! got_it, % prime queue with got_it
+    self() ! got_it,
     {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
         fun(SrcDocInfo, _, _) ->
             #doc_info{id=Id,
@@ -128,73 +130,73 @@
                 deleted_conflict_revs=DelConflicts,
                 update_seq=Seq} = SrcDocInfo,
             SrcRevs = [Rev | Conflicts] ++ DelConflicts,
-            MissingRevsPid !  {Id, SrcRevs}, % send to the missing revs process
+            receive got_it -> ok end,
+            MissingRevsPid !  {self(), Id, SrcRevs}, % send to the missing revs process
             {ok, Seq}
         end, SourceSeqNum),
-    MissingRevsPid ! shutdown,
+    
+    receive got_it -> ok end,
+    
+    MissingRevsPid ! {self(), shutdown},
     receive {done, MissingRevsPid, Stats1} -> ok end,
     
-    OpenDocsPid ! shutdown,
+    OpenDocsPid ! {self(), shutdown},
     receive {done, OpenDocsPid, Stats2} -> ok end,
     
-    SaveDocsPid ! shutdown,
+    SaveDocsPid ! {self(), shutdown},
     receive {done, SaveDocsPid, Stats3} -> ok end,
     
     {NewSeq, Stats1 ++ Stats2 ++ Stats3}.
 
 
-receive_id_revs() ->
-    receive
-    {Id, Revs} ->
-        [{Id, Revs} | receive_id_revs()]
-    after 1 ->
-        []
-    end.
-
-get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+    receive got_it -> ok end,
     receive
-    {Id, Revs} ->
-        Changed = [{Id, Revs} | receive_id_revs()],
-        {ok, Missing} = get_missing_revs(DbTarget, Changed),
-        [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing],
-        get_missing_revs_loop(Parent, DbTarget, OpenDocsPid,
-                RevsChecked + length(Changed),
-                MissingFound + length(Missing));
-    shutdown ->
-        Parent ! {done, self(), [{"missing_checked", RevsChecked},
+    {Src, Id, Revs} ->
+        Src ! got_it,
+        MissingRevs =
+        case get_missing_revs(DbTarget, [{Id, Revs}]) of
+        {ok, [{Id, MissingRevs0}]} ->
+            OpenDocsPid ! {self(), Id, MissingRevs0},
+            MissingRevs0;
+        {ok, []} ->
+            % prime our message queue
+            self() ! got_it,
+            []
+        end,
+        get_missing_revs_loop(DbTarget, OpenDocsPid,
+                RevsChecked + length(Revs),
+                MissingFound + length(MissingRevs));
+    {Src, shutdown} ->
+        Src ! {done, self(), [{"missing_checked", RevsChecked},
                                  {"missing_found", MissingFound}]}
     end.
     
 
-open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) ->
+open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) ->
+    receive got_it -> ok end,
     receive
-    {Id, MissingRevs} ->
+    {Src, Id, MissingRevs} ->
+        Src ! got_it,
         {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
         % only save successful reads
         Docs = [RevDoc || {ok, RevDoc} <- DocResults],
-        SaveDocsPid ! Docs,
-        open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs));
-    shutdown ->
-        Parent ! {done, self(), [{"docs_read", DocsRead}]}
+        SaveDocsPid ! {self(), docs, Docs},
+        open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs));
+    {Src, shutdown} ->
+        Src ! {done, self(), [{"docs_read", DocsRead}]}
     end.
 
 
-receive_docs() ->
-    receive
-    Docs when is_list(Docs) ->
-        Docs ++ receive_docs()
-    after 1 ->
-        []
-    end.
         
-save_docs_loop(Parent, DbTarget, DocsWritten) ->
+save_docs_loop(DbTarget, DocsWritten) ->
     receive
-    Docs0 when is_list(Docs0)  ->
-        Docs = Docs0 ++ receive_docs(),
+    {Src, docs, Docs} ->
+        Src ! got_it,
         ok = save_docs(DbTarget, Docs, []),
-        save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs));
-    shutdown ->
-        Parent ! {done, self(), [{"docs_written", DocsWritten}]}
+        save_docs_loop(DbTarget, DocsWritten + length(Docs));
+    {Src, shutdown} ->
+        Src ! {done, self(), [{"docs_written", DocsWritten}]}
     end.
 
 
@@ -239,7 +241,7 @@
 
 
 enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
-    Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq),
+    Url = DbUrl ++ "_all_docs_by_seq?count=4&startkey=" ++ integer_to_list(StartSeq),
     {obj, Results} = do_http_request(Url, get),
     DocInfoList=
     lists:map(fun({obj, RowInfoList}) ->
@@ -254,7 +256,14 @@
                 tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})),
             deleted = proplists:get_value("deleted", RowValueProps, false)}
         end, tuple_to_list(proplists:get_value("rows", Results))),
-    {ok, enum_docs0(InFun, DocInfoList, InAcc)};
+    case DocInfoList of
+    [] ->
+        {ok, InAcc};
+    _ ->
+        Acc2 = enum_docs0(InFun, DocInfoList, InAcc),
+        #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
+        enum_docs_since(DbUrl, LastSeq, InFun, Acc2)
+    end;
 enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
     couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).