You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/17 18:38:46 UTC
svn commit: r1023523 - in /couchdb/branches/new_replicator/src/couchdb:
couch_replicator.erl couch_replicator_doc_copiers.erl
couch_replicator_rev_finders.erl
Author: fdmanana
Date: Sun Oct 17 16:38:46 2010
New Revision: 1023523
URL: http://svn.apache.org/viewvc?rev=1023523&view=rev
Log:
New replicator: simplifyed code and reduced contention on the main gen_server.
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1023523&r1=1023522&r2=1023523&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Sun Oct 17 16:38:46 2010
@@ -239,8 +239,8 @@ do_init(#rep{options = Options} = Rep) -
% This starts the _changes reader process. It adds the changes from
% the source db to the ChangesQueue.
- ChangesReader = spawn_changes_reader(self(), StartSeq, Source,
- ChangesQueue, Options),
+ ChangesReader = spawn_changes_reader(
+ StartSeq, Source, ChangesQueue, Options),
% This starts the missing rev finders. They check the target for changes
% in the ChangesQueue to see if they exist on the target or not. If not,
@@ -351,13 +351,13 @@ handle_cast(checkpoint, State) ->
State2 = do_checkpoint(State),
{noreply, State2#rep_state{timer = start_timer(State)}};
-handle_cast({seq_start, {Seq, NumChanges}}, State) ->
+handle_cast({seq_start, {LargestSeq, NumChanges}}, State) ->
#rep_state{
- seqs_in_progress = SeqsInProgress,
+ seqs_in_progress = SeqsTree,
stats = #rep_stats{missing_checked = Mc} = Stats
} = State,
NewState = State#rep_state{
- seqs_in_progress = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
+ seqs_in_progress = gb_trees:insert(LargestSeq, NumChanges, SeqsTree),
stats = Stats#rep_stats{missing_checked = Mc + NumChanges}
},
{noreply, NewState};
@@ -496,12 +496,11 @@ init_state(Rep) ->
State#rep_state{timer = start_timer(State)}.
-spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) ->
+spawn_changes_reader(StartSeq, Source, ChangesQueue, Options) ->
spawn_link(
fun()->
couch_api_wrap:changes_since(Source, all_docs, StartSeq,
- fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo) ->
- ok = gen_server:cast(Cp, {seq_start, {Seq, length(Revs)}}),
+ fun(DocInfo) ->
ok = couch_work_queue:queue(ChangesQueue, DocInfo)
end, Options),
couch_work_queue:close(ChangesQueue)
@@ -659,32 +658,22 @@ has_session_id(SessionId, [{Props} | Res
end.
-process_seq_changes_done(Changes, State) ->
+process_seq_changes_done({Seq, NumChangesDone}, State) ->
#rep_state{
seqs_in_progress = SeqsInProgress,
next_through_seqs = DoneSeqs,
current_through_seq = ThroughSeq
} = State,
- {SeqsInProgress2, LastSeq} = lists:foldl(
- fun({Seq, ChangesDone}, {SeqTree, MaxSeqDone}) ->
- Total = gb_trees:get(Seq, SeqTree),
- case Total - ChangesDone of
- 0 ->
- NewMaxSeqDone = case MaxSeqDone of
- nil ->
- Seq;
- _ ->
- lists:max([Seq, MaxSeqDone])
- end,
- {gb_trees:delete(Seq, SeqTree), NewMaxSeqDone};
- NewTotal when NewTotal > 0 ->
- {gb_trees:update(Seq, NewTotal, SeqTree), MaxSeqDone}
- end
- end,
- {SeqsInProgress, nil}, Changes),
+ Total = gb_trees:get(Seq, SeqsInProgress),
+ SeqsInProgress2 = case Total - NumChangesDone of
+ 0 ->
+ gb_trees:delete(Seq, SeqsInProgress);
+ NewTotal when NewTotal > 0 ->
+ gb_trees:update(Seq, NewTotal, SeqsInProgress)
+ end,
- DoneSeqs2 = add_done_seq(LastSeq, DoneSeqs),
+ DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
{NewThroughSeq, DoneSeqs3} =
get_next_through_seq(ThroughSeq, SeqsInProgress2, DoneSeqs2),
@@ -695,12 +684,6 @@ process_seq_changes_done(Changes, State)
}.
-add_done_seq(nil, DoneSeqs) ->
- DoneSeqs;
-add_done_seq(Seq, DoneSeqs) ->
- ordsets:add_element(Seq, DoneSeqs).
-
-
get_next_through_seq(Current, InProgress, Done) ->
case gb_trees:is_empty(InProgress) of
true ->
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1023523&r1=1023522&r2=1023523&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sun Oct 17 16:38:46 2010
@@ -33,7 +33,6 @@ spawn_doc_copiers(Cp, Source, Target, Mi
-record(doc_acc, {
docs = [],
- seqs = [],
read = 0,
written = 0,
wfail = 0
@@ -51,29 +50,40 @@ doc_copy_loop(Cp, Source, Target, Missin
?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [self(), Id]),
{ok, Acc2} = couch_api_wrap:open_doc_revs(
Source, Id, all, [],
- fun(R, A) -> doc_handler(R, nil, Target, A) end, Acc),
+ fun(R, A) -> doc_handler(R, Target, A) end, Acc),
Acc2
end,
#doc_acc{}, DocIds),
- {Source, Acc};
+ {Source, Acc, {nil, 0}};
- {ok, IdRevList} ->
- lists:foldl(
+ {ok, [{_, FirstRevs, _, FirstSeq} | RestIdRevList] = IdRevList} ->
+ {LargestSeq, TotalChanges} = lists:foldl(
+ fun({_, Revs, _, Seq}, {Largest, Total}) when Seq > Largest ->
+ {Seq, Total + length(Revs)};
+ ({_, Revs, _, _}, {Largest, Total}) ->
+ {Largest, Total + length(Revs)}
+ end,
+ {FirstSeq, length(FirstRevs)}, RestIdRevList),
+ LastSeqDone = {LargestSeq, TotalChanges},
+ ok = gen_server:cast(Cp, {seq_start, LastSeqDone}),
+ {NewSource, Acc} = lists:foldl(
fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
?LOG_DEBUG("Doc copier ~p got ~p", [self(), IdRev]),
SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
{ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
- fun(R, A) -> doc_handler(R, Seq, Target, A) end,
+ fun(R, A) -> doc_handler(R, Target, A) end,
BulkAcc),
{SrcDb2, BulkAcc2}
end,
- {Source, #doc_acc{}}, IdRevList)
+ {Source, #doc_acc{}}, IdRevList),
+ {NewSource, Acc, LastSeqDone}
end,
+
case Result of
- {Source2, DocAcc} ->
- #doc_acc{seqs = SeqsDone} = DocAcc2 = bulk_write_docs(DocAcc, Target),
- seqs_done(SeqsDone, Cp),
+ {Source2, DocAcc, LargestSeqDone} ->
+ DocAcc2 = bulk_write_docs(DocAcc, Target),
+ seq_done(LargestSeqDone, Cp),
send_stats(DocAcc2, Cp),
doc_copy_loop(Cp, Source2, Target, MissingRevsQueue);
stop ->
@@ -81,53 +91,30 @@ doc_copy_loop(Cp, Source, Target, Missin
end.
-doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Acc) ->
- update_bulk_doc_acc(Acc, Seq, Doc);
+doc_handler({ok, #doc{atts = []} = Doc}, _Target, Acc) ->
+ update_bulk_doc_acc(Acc, Doc);
-doc_handler({ok, Doc}, Seq, Target, Acc) ->
- write_doc(Doc, Seq, Target, Acc);
+doc_handler({ok, Doc}, Target, Acc) ->
+ write_doc(Doc, Target, Acc);
-doc_handler(_, _, _, Acc) ->
+doc_handler(_, _, Acc) ->
Acc.
-update_bulk_doc_acc(#doc_acc{docs = Docs, read = Read} = Acc, nil, Doc) ->
- Acc#doc_acc{
- docs = [Doc | Docs],
- read = Read + 1
- };
-
-update_bulk_doc_acc(#doc_acc{seqs = [{Seq, Count} | Rest]} = Acc, Seq, Doc) ->
- Acc#doc_acc{
- docs = [Doc | Acc#doc_acc.docs],
- seqs = [{Seq, Count + 1} | Rest],
- read = Acc#doc_acc.read + 1
- };
-
-update_bulk_doc_acc(#doc_acc{seqs = Seqs, read = Read} = Acc, Seq, Doc) ->
- Acc#doc_acc{
- docs = [Doc | Acc#doc_acc.docs],
- seqs = [{Seq, 1} | Seqs],
- read = Read + 1
- }.
+update_bulk_doc_acc(#doc_acc{docs = DocAcc, read = Read} = Acc, Doc) ->
+ Acc#doc_acc{docs = [Doc | DocAcc], read = Read + 1}.
-write_doc(Doc, Seq, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
- SeqsDone = case Acc#doc_acc.seqs of
- [{Seq, Count} | RestSeqs] ->
- [{Seq, Count + 1} | RestSeqs];
- RestSeqs ->
- [{Seq, 1} | RestSeqs]
- end,
+write_doc(Doc, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
{ok, _} ->
- Acc#doc_acc{written = W + 1, read = R + 1, seqs = SeqsDone};
+ Acc#doc_acc{written = W + 1, read = R + 1};
{error, <<"unauthorized">>} ->
?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
[Doc#doc.id, couch_api_wrap:db_uri(Db)]),
- Acc#doc_acc{wfail = F + 1, read = R + 1, seqs = SeqsDone};
+ Acc#doc_acc{wfail = F + 1, read = R + 1};
_ ->
- Acc#doc_acc{wfail = F + 1, read = R + 1, seqs = SeqsDone}
+ Acc#doc_acc{wfail = F + 1, read = R + 1}
end.
@@ -150,12 +137,10 @@ bulk_write_docs(#doc_acc{docs = Docs, wr
}.
-seqs_done([], _) ->
- ok;
-seqs_done([{nil, _} | _], _) ->
+seq_done({nil, _}, _Cp) ->
ok;
-seqs_done(SeqCounts, Cp) ->
- ok = gen_server:cast(Cp, {seq_changes_done, SeqCounts}).
+seq_done(SeqDone, Cp) ->
+ ok = gen_server:cast(Cp, {seq_changes_done, SeqDone}).
send_stats(#doc_acc{read = R, written = W, wfail = Wf}, Cp) ->
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1023523&r1=1023522&r2=1023523&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Sun Oct 17 16:38:46 2010
@@ -61,15 +61,9 @@ missing_revs_finder_loop(Cp, Target, Cha
% incremental attachment replication, so the source only needs to send
% attachments modified since the common ancestor on target.
- % Signal to the checkpointer any that are already on the target are
- % now complete.
IdRevsSeqDict = dict:from_list(
[{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
#doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]),
- NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
- % signal the completion of these that aren't missing
- report_non_missing(NonMissingIdRevsSeqDict, Cp),
-
% Expand out each docs and seq into it's own work item
MissingCount = lists:foldl(
fun({Id, Revs, PAs}, Count) ->
@@ -83,33 +77,7 @@ missing_revs_finder_loop(Cp, Target, Cha
end.
-report_non_missing(RevsDict, Cp) ->
- case dict:size(RevsDict) of
- 0 ->
- ok;
- N when N > 0 ->
- SeqsDone = [{Seq, length(Revs)} ||
- {_Id, {Revs, Seq}} <- dict:to_list(RevsDict)],
- ok = gen_server:cast(Cp, {seq_changes_done, SeqsDone})
- end.
-
-
send_missing_found(0, _Cp) ->
ok;
send_missing_found(Value, Cp) ->
ok = gen_server:cast(Cp, {add_stats, #rep_stats{missing_found = Value}}).
-
-
-remove_missing(IdRevsSeqDict, []) ->
- IdRevsSeqDict;
-
-remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) ->
- {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
- case AllChangedRevs -- MissingRevs of
- [] ->
- remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
- NotMissingRevs ->
- IdRevsSeqDict2 =
- dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
- remove_missing(IdRevsSeqDict2, Rest)
- end.