You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/01 20:00:48 UTC
svn commit: r1003599 - in /couchdb/branches/new_replicator/src/couchdb:
couch_replicator.erl couch_replicator_doc_copiers.erl
couch_replicator_rev_finders.erl
Author: fdmanana
Date: Fri Oct 1 18:00:48 2010
New Revision: 1003599
URL: http://svn.apache.org/viewvc?rev=1003599&view=rev
Log:
New replicator: avoid having a replication gen_server receiving 1 message for each processed source sequence. Now it groups them into lists with size up to ?DOC_BATCH_SIZE.
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1003599&r1=1003598&r2=1003599&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Fri Oct 1 18:00:48 2010
@@ -278,42 +278,8 @@ handle_info({seq_start, {Seq, NumChanges
State#rep_state.seqs_in_progress),
{noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
-handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) ->
- #rep_state{
- seqs_in_progress = SeqsInProgress,
- next_through_seqs = DoneSeqs,
- is_successor_seq = IsSuccFun
- } = State,
- % Decrement the # changes for this seq by NumChangesDone.
- TotalChanges = gb_trees:get(Seq, SeqsInProgress),
- NewState = case TotalChanges - NumChangesDone of
- 0 ->
- % This seq is completely processed. Check to see if it was the
- % smallest seq in progess. If so, we've made progress that can
- % be checkpointed.
- State2 = case gb_trees:smallest(SeqsInProgress) of
- {Seq, _} ->
- {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
- Seq, DoneSeqs, IsSuccFun),
- State#rep_state{
- current_through_seq = CheckpointSeq,
- next_through_seqs = DoneSeqs2
- };
- _ ->
- DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
- State#rep_state{next_through_seqs = DoneSeqs2}
- end,
- State2#rep_state{
- seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
- };
- NewTotalChanges when NewTotalChanges > 0 ->
- % There are still some changes that need work done.
- % Put the new count back.
- State#rep_state{
- seqs_in_progress =
- gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
- }
- end,
+handle_info({seq_changes_done, Changes}, State) ->
+ NewState = lists:foldl(fun process_seq_changes_done/2, State, Changes),
{noreply, NewState};
handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
@@ -710,3 +676,40 @@ has_session_id(SessionId, [{Props} | Res
has_session_id(SessionId, Rest)
end.
+
+process_seq_changes_done({Seq, NumChangesDone}, State) ->
+ #rep_state{
+ seqs_in_progress = SeqsInProgress,
+ next_through_seqs = DoneSeqs,
+ is_successor_seq = IsSuccFun
+ } = State,
+ % Decrement the # changes for this seq by NumChangesDone.
+ TotalChanges = gb_trees:get(Seq, SeqsInProgress),
+ case TotalChanges - NumChangesDone of
+ 0 ->
+ % This seq is completely processed. Check to see if it was the
+ % smallest seq in progess. If so, we've made progress that can
+ % be checkpointed.
+ State2 = case gb_trees:smallest(SeqsInProgress) of
+ {Seq, _} ->
+ {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
+ Seq, DoneSeqs, IsSuccFun),
+ State#rep_state{
+ current_through_seq = CheckpointSeq,
+ next_through_seqs = DoneSeqs2
+ };
+ _ ->
+ DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
+ State#rep_state{next_through_seqs = DoneSeqs2}
+ end,
+ State2#rep_state{
+ seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
+ };
+ NewTotalChanges when NewTotalChanges > 0 ->
+ % There are still some changes that need work done.
+ % Put the new count back.
+ State#rep_state{
+ seqs_in_progress =
+ gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
+ }
+ end.
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003599&r1=1003598&r2=1003599&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Fri Oct 1 18:00:48 2010
@@ -105,6 +105,8 @@ write_doc(Doc, Seq, Db, Cp) ->
seqs_done([{Seq, 1}], Cp).
+bulk_write_docs([], _, _, _) ->
+ ok;
bulk_write_docs(Docs, Seqs, Db, Cp) ->
case couch_api_wrap:update_docs(
Db, Docs, [delay_commit], replicated_changes) of
@@ -125,9 +127,9 @@ bulk_write_docs(Docs, Seqs, Db, Cp) ->
seqs_done(Seqs, Cp).
+seqs_done([], _) ->
+ ok;
+seqs_done([{nil, _} | _], _) ->
+ ok;
seqs_done(SeqCounts, Cp) ->
- lists:foreach(fun({nil, _}) ->
- ok;
- (SeqCount) ->
- Cp ! {seq_changes_done, SeqCount}
- end, SeqCounts).
+ Cp ! {seq_changes_done, SeqCounts}.
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003599&r1=1003598&r2=1003599&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Fri Oct 1 18:00:48 2010
@@ -69,9 +69,9 @@ missing_revs_finder_loop(FinderId, Cp, T
#doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]),
NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
% signal the completion of these that aren't missing
- lists:foreach(fun({_Id, {Revs, Seq}}) ->
- Cp ! {seq_changes_done, {Seq, length(Revs)}}
- end, dict:to_list(NonMissingIdRevsSeqDict)),
+ Cp ! {seq_changes_done,
+ [{Seq, length(Revs)} ||
+ {_Id, {Revs, Seq}} <- dict:to_list(NonMissingIdRevsSeqDict)]},
% Expand out each docs and seq into it's own work item
lists:foreach(fun({Id, Revs, PAs}) ->