You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2008/07/16 22:55:14 UTC
svn commit: r677426 - /incubator/couchdb/trunk/src/couchdb/couch_rep.erl
Author: damien
Date: Wed Jul 16 13:55:14 2008
New Revision: 677426
URL: http://svn.apache.org/viewvc?rev=677426&view=rev
Log:
Fixed replication problems where read ad write queues can get backed up. With this fixed, throughput might be reduced.
Modified:
incubator/couchdb/trunk/src/couchdb/couch_rep.erl
Modified: incubator/couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_rep.erl?rev=677426&r1=677425&r2=677426&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_rep.erl Wed Jul 16 13:55:14 2008
@@ -113,13 +113,15 @@
end.
pull_rep(DbTarget, DbSource, SourceSeqNum) ->
- Parent = self(),
SaveDocsPid = spawn_link(fun() ->
- save_docs_loop(Parent, DbTarget, 0) end),
+ save_docs_loop(DbTarget, 0) end),
OpenDocsPid = spawn_link(fun() ->
- open_doc_revs_loop(Parent, DbSource, SaveDocsPid, 0) end),
+ open_doc_revs_loop(DbSource, SaveDocsPid, 0) end),
+ OpenDocsPid ! got_it, % prime queue with got_it
MissingRevsPid = spawn_link(fun() ->
- get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, 0, 0) end),
+ get_missing_revs_loop(DbTarget, OpenDocsPid, 0, 0) end),
+ MissingRevsPid ! got_it, % prime queue with got_it
+ self() ! got_it,
{ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
fun(SrcDocInfo, _, _) ->
#doc_info{id=Id,
@@ -128,73 +130,73 @@
deleted_conflict_revs=DelConflicts,
update_seq=Seq} = SrcDocInfo,
SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- MissingRevsPid ! {Id, SrcRevs}, % send to the missing revs process
+ receive got_it -> ok end,
+ MissingRevsPid ! {self(), Id, SrcRevs}, % send to the missing revs process
{ok, Seq}
end, SourceSeqNum),
- MissingRevsPid ! shutdown,
+
+ receive got_it -> ok end,
+
+ MissingRevsPid ! {self(), shutdown},
receive {done, MissingRevsPid, Stats1} -> ok end,
- OpenDocsPid ! shutdown,
+ OpenDocsPid ! {self(), shutdown},
receive {done, OpenDocsPid, Stats2} -> ok end,
- SaveDocsPid ! shutdown,
+ SaveDocsPid ! {self(), shutdown},
receive {done, SaveDocsPid, Stats3} -> ok end,
{NewSeq, Stats1 ++ Stats2 ++ Stats3}.
-receive_id_revs() ->
- receive
- {Id, Revs} ->
- [{Id, Revs} | receive_id_revs()]
- after 1 ->
- []
- end.
-
-get_missing_revs_loop(Parent, DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) ->
+ receive got_it -> ok end,
receive
- {Id, Revs} ->
- Changed = [{Id, Revs} | receive_id_revs()],
- {ok, Missing} = get_missing_revs(DbTarget, Changed),
- [OpenDocsPid ! {Id0, MissingRevs} || {Id0, MissingRevs} <- Missing],
- get_missing_revs_loop(Parent, DbTarget, OpenDocsPid,
- RevsChecked + length(Changed),
- MissingFound + length(Missing));
- shutdown ->
- Parent ! {done, self(), [{"missing_checked", RevsChecked},
+ {Src, Id, Revs} ->
+ Src ! got_it,
+ MissingRevs =
+ case get_missing_revs(DbTarget, [{Id, Revs}]) of
+ {ok, [{Id, MissingRevs0}]} ->
+ OpenDocsPid ! {self(), Id, MissingRevs0},
+ MissingRevs0;
+ {ok, []} ->
+ % prime our message queue
+ self() ! got_it,
+ []
+ end,
+ get_missing_revs_loop(DbTarget, OpenDocsPid,
+ RevsChecked + length(Revs),
+ MissingFound + length(MissingRevs));
+ {Src, shutdown} ->
+ Src ! {done, self(), [{"missing_checked", RevsChecked},
{"missing_found", MissingFound}]}
end.
-open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead) ->
+open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) ->
+ receive got_it -> ok end,
receive
- {Id, MissingRevs} ->
+ {Src, Id, MissingRevs} ->
+ Src ! got_it,
{ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
% only save successful reads
Docs = [RevDoc || {ok, RevDoc} <- DocResults],
- SaveDocsPid ! Docs,
- open_doc_revs_loop(Parent, DbSource, SaveDocsPid, DocsRead + length(Docs));
- shutdown ->
- Parent ! {done, self(), [{"docs_read", DocsRead}]}
+ SaveDocsPid ! {self(), docs, Docs},
+ open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs));
+ {Src, shutdown} ->
+ Src ! {done, self(), [{"docs_read", DocsRead}]}
end.
-receive_docs() ->
- receive
- Docs when is_list(Docs) ->
- Docs ++ receive_docs()
- after 1 ->
- []
- end.
-save_docs_loop(Parent, DbTarget, DocsWritten) ->
+save_docs_loop(DbTarget, DocsWritten) ->
receive
- Docs0 when is_list(Docs0) ->
- Docs = Docs0 ++ receive_docs(),
+ {Src, docs, Docs} ->
+ Src ! got_it,
ok = save_docs(DbTarget, Docs, []),
- save_docs_loop(Parent, DbTarget, DocsWritten + length(Docs));
- shutdown ->
- Parent ! {done, self(), [{"docs_written", DocsWritten}]}
+ save_docs_loop(DbTarget, DocsWritten + length(Docs));
+ {Src, shutdown} ->
+ Src ! {done, self(), [{"docs_written", DocsWritten}]}
end.
@@ -239,7 +241,7 @@
enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
- Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq),
+ Url = DbUrl ++ "_all_docs_by_seq?count=4&startkey=" ++ integer_to_list(StartSeq),
{obj, Results} = do_http_request(Url, get),
DocInfoList=
lists:map(fun({obj, RowInfoList}) ->
@@ -254,7 +256,14 @@
tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})),
deleted = proplists:get_value("deleted", RowValueProps, false)}
end, tuple_to_list(proplists:get_value("rows", Results))),
- {ok, enum_docs0(InFun, DocInfoList, InAcc)};
+ case DocInfoList of
+ [] ->
+ {ok, InAcc};
+ _ ->
+ Acc2 = enum_docs0(InFun, DocInfoList, InAcc),
+ #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
+ enum_docs_since(DbUrl, LastSeq, InFun, Acc2)
+ end;
enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).