You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/01 20:00:48 UTC

svn commit: r1003599 - in /couchdb/branches/new_replicator/src/couchdb: couch_replicator.erl couch_replicator_doc_copiers.erl couch_replicator_rev_finders.erl

Author: fdmanana
Date: Fri Oct  1 18:00:48 2010
New Revision: 1003599

URL: http://svn.apache.org/viewvc?rev=1003599&view=rev
Log:
New replicator: avoid having a replication gen_server receiving 1 message for each processed source sequence. Now it groups them into lists with size up to ?DOC_BATCH_SIZE.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1003599&r1=1003598&r2=1003599&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Fri Oct  1 18:00:48 2010
@@ -278,42 +278,8 @@ handle_info({seq_start, {Seq, NumChanges
         State#rep_state.seqs_in_progress),
     {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
 
-handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) ->
-    #rep_state{
-        seqs_in_progress = SeqsInProgress,
-        next_through_seqs = DoneSeqs,
-        is_successor_seq = IsSuccFun
-    } = State,
-    % Decrement the # changes for this seq by NumChangesDone.
-    TotalChanges = gb_trees:get(Seq, SeqsInProgress),
-    NewState = case TotalChanges - NumChangesDone of
-    0 ->
-        % This seq is completely processed. Check to see if it was the
-        % smallest seq in progess. If so, we've made progress that can
-        % be checkpointed.
-        State2 = case gb_trees:smallest(SeqsInProgress) of
-        {Seq, _} ->
-            {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
-                Seq, DoneSeqs, IsSuccFun),
-            State#rep_state{
-                current_through_seq = CheckpointSeq,
-                next_through_seqs = DoneSeqs2
-            };
-        _ ->
-            DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
-            State#rep_state{next_through_seqs = DoneSeqs2}
-        end,
-        State2#rep_state{
-            seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
-        };
-    NewTotalChanges when NewTotalChanges > 0 ->
-        % There are still some changes that need work done.
-        % Put the new count back.
-        State#rep_state{
-            seqs_in_progress =
-                gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
-        }
-    end,
+handle_info({seq_changes_done, Changes}, State) ->
+    NewState = lists:foldl(fun process_seq_changes_done/2, State, Changes),
     {noreply, NewState};
 
 handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
@@ -710,3 +676,40 @@ has_session_id(SessionId, [{Props} | Res
         has_session_id(SessionId, Rest)
     end.
 
+
+process_seq_changes_done({Seq, NumChangesDone}, State) ->
+    #rep_state{
+        seqs_in_progress = SeqsInProgress,
+        next_through_seqs = DoneSeqs,
+        is_successor_seq = IsSuccFun
+    } = State,
+    % Decrement the # changes for this seq by NumChangesDone.
+    TotalChanges = gb_trees:get(Seq, SeqsInProgress),
+    case TotalChanges - NumChangesDone of
+    0 ->
+        % This seq is completely processed. Check to see if it was the
+        % smallest seq in progess. If so, we've made progress that can
+        % be checkpointed.
+        State2 = case gb_trees:smallest(SeqsInProgress) of
+        {Seq, _} ->
+            {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
+                Seq, DoneSeqs, IsSuccFun),
+            State#rep_state{
+                current_through_seq = CheckpointSeq,
+                next_through_seqs = DoneSeqs2
+            };
+        _ ->
+            DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
+            State#rep_state{next_through_seqs = DoneSeqs2}
+        end,
+        State2#rep_state{
+            seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
+        };
+    NewTotalChanges when NewTotalChanges > 0 ->
+        % There are still some changes that need work done.
+        % Put the new count back.
+        State#rep_state{
+            seqs_in_progress =
+                gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
+        }
+    end.

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003599&r1=1003598&r2=1003599&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Fri Oct  1 18:00:48 2010
@@ -105,6 +105,8 @@ write_doc(Doc, Seq, Db, Cp) ->
     seqs_done([{Seq, 1}], Cp).
 
 
+bulk_write_docs([], _, _, _) ->
+    ok;
 bulk_write_docs(Docs, Seqs, Db, Cp) ->
     case couch_api_wrap:update_docs(
         Db, Docs, [delay_commit], replicated_changes) of
@@ -125,9 +127,9 @@ bulk_write_docs(Docs, Seqs, Db, Cp) ->
     seqs_done(Seqs, Cp).
 
 
+seqs_done([], _) ->
+    ok;
+seqs_done([{nil, _} | _], _) ->
+    ok;
 seqs_done(SeqCounts, Cp) ->
-    lists:foreach(fun({nil, _}) ->
-            ok;
-        (SeqCount) ->
-            Cp ! {seq_changes_done, SeqCount}
-        end, SeqCounts).
+    Cp ! {seq_changes_done, SeqCounts}.

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003599&r1=1003598&r2=1003599&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Fri Oct  1 18:00:48 2010
@@ -69,9 +69,9 @@ missing_revs_finder_loop(FinderId, Cp, T
                     #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]),
         NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
         % signal the completion of these that aren't missing
-        lists:foreach(fun({_Id, {Revs, Seq}}) ->
-                Cp ! {seq_changes_done, {Seq, length(Revs)}}
-            end, dict:to_list(NonMissingIdRevsSeqDict)),
+        Cp ! {seq_changes_done,
+            [{Seq, length(Revs)} ||
+                {_Id, {Revs, Seq}} <- dict:to_list(NonMissingIdRevsSeqDict)]},
 
         % Expand out each docs and seq into it's own work item
         lists:foreach(fun({Id, Revs, PAs}) ->