You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/08/30 12:15:46 UTC

svn commit: r990757 - in /couchdb/branches/new_replicator: etc/couchdb/default.ini.tpl.in src/couchdb/couch_replicate.erl

Author: fdmanana
Date: Mon Aug 30 10:15:46 2010
New Revision: 990757

URL: http://svn.apache.org/viewvc?rev=990757&view=rev
Log:
New replicator: first version using multiple workers (doc copier processes).

Modified:
    couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=990757&r1=990756&r2=990757&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Mon Aug 30 10:15:46 2010
@@ -116,5 +116,6 @@ compression_level = 8 ; from 1 (lowest, 
 compressible_types = text/*, application/javascript, application/json,  application/xml
 
 [replicator]
+copy_processes = 8
 max_http_sessions = 10
 max_http_pipeline_size = 10
\ No newline at end of file

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=990757&r1=990756&r2=990757&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Mon Aug 30 10:15:46 2010
@@ -47,6 +47,8 @@
     checkpoint_history,
     start_seq,
     current_through_seq,
+    next_through_seqs = ordsets:new(),
+    is_successor_seq,
     committed_seq,
     source_log,
     target_log,
@@ -58,7 +60,8 @@
     changes_queue,
     changes_reader,
     missing_revs_finder,
-    doc_copier,
+    doc_copiers,
+    finished_doc_copiers = 0,
     seqs_in_progress = gb_trees:from_orddict([]),
     stats = #stats{}
     }).
@@ -178,7 +181,7 @@ do_init([RepId, Src, Tgt, Options, UserC
     } = State = init_state(RepId, Src, Tgt, Options, UserCtx),
 
     {ok, MissingRevsQueue} = couch_work_queue:new(
-        [{max_size, 100000}, {max_items, 500}]),
+        [{max_size, 100000}, {max_items, 500}, {multi_workers, true}]),
 
     case couch_util:get_value(doc_ids, Options) of
     undefined ->
@@ -209,16 +212,18 @@ do_init([RepId, Src, Tgt, Options, UserC
         end
     end,
 
-    % This starts the doc copy process. It fetches documents from the
-    % MissingRevsQueue and copies them from the source to the target database.
-    DocCopier = spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
+    % This starts the doc copy processes. They fetch documents from the
+    % MissingRevsQueue and copy them from the source to the target database.
+    DocCopiers = spawn_doc_copiers(self(), Source, Target, MissingRevsQueue),
 
     {ok, State#rep_state{
             missing_revs_queue = MissingRevsQueue,
             changes_queue = ChangesQueue,
             changes_reader = ChangesReader,
             missing_revs_finder = MissingRevsFinder,
-            doc_copier = DocCopier
+            doc_copiers = DocCopiers,
+            is_successor_seq = couch_util:get_value(is_successor_seq, Options,
+                fun(Seq, NextSeq) -> (Seq + 1) =:= NextSeq end)
         }
     }.
 
@@ -229,7 +234,11 @@ handle_info({seq_start, {Seq, NumChanges
     {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
 
 handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) ->
-    #rep_state{seqs_in_progress = SeqsInProgress} = State,
+    #rep_state{
+        seqs_in_progress = SeqsInProgress,
+        next_through_seqs = DoneSeqs,
+        is_successor_seq = IsSuccFun
+    } = State,
     % Decrement the # changes for this seq by NumChangesDone.
     TotalChanges = gb_trees:get(Seq, SeqsInProgress),
     NewState = case TotalChanges - NumChangesDone of
@@ -239,9 +248,15 @@ handle_info({seq_changes_done, {Seq, Num
         % be checkpointed.
         State2 = case gb_trees:smallest(SeqsInProgress) of
         {Seq, _} ->
-            State#rep_state{current_through_seq = Seq};
+            {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
+                Seq, DoneSeqs, IsSuccFun),
+            State#rep_state{
+                current_through_seq = CheckpointSeq,
+                next_through_seqs = DoneSeqs2
+            };
         _ ->
-            State
+            DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
+            State#rep_state{next_through_seqs = DoneSeqs2}
         end,
         State2#rep_state{
             seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
@@ -261,13 +276,31 @@ handle_info({add_stat, {StatPos, Val}}, 
     NewStats = setelement(StatPos, Stats, Stat + Val),
     {noreply, State#rep_state{stats = NewStats}};
 
-handle_info(done, #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    % This means all the worker processes have completed their work.
-    % Assert that all the seqs have been processed
-    0 = gb_trees:size(SeqsInProgress),
-    NewState = do_checkpoint(State),
-    cancel_timer(NewState),
-    {stop, normal, NewState};
+handle_info({done, _CopierId}, State) ->
+    #rep_state{
+        finished_doc_copiers = Finished,
+        doc_copiers = DocCopiers,
+        next_through_seqs = DoneSeqs,
+        current_through_seq = Seq
+    } = State,
+    State1 = State#rep_state{finished_doc_copiers = Finished + 1},
+    case length(DocCopiers) - 1 of
+    Finished ->
+        % This means all the worker processes have completed their work.
+        % Assert that all the seqs have been processed.
+        0 = gb_trees:size(State#rep_state.seqs_in_progress),
+        LastSeq = case DoneSeqs of
+        [] ->
+            Seq;
+        _ ->
+            lists:max([Seq, lists:last(DoneSeqs)])
+        end,
+        State2 = do_checkpoint(State1#rep_state{current_through_seq = LastSeq}),
+        cancel_timer(State2),
+        {stop, normal, State2};
+    _ ->
+        {noreply, State1}
+    end;
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
     {noreply, State};
@@ -285,14 +318,6 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     ?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]),
     {stop, missing_revs_finder_died, St};
 
-handle_info({'EXIT', Pid, normal}, #rep_state{doc_copier=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{doc_copier=Pid} = State) ->
-    cancel_timer(State),
-    ?LOG_ERROR("DocCopier process died with reason: ~p", [Reason]),
-    {stop, doc_copier_died, State};
-
 handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
     {noreply, St};
 
@@ -307,7 +332,25 @@ handle_info({'EXIT', Pid, normal}, #rep_
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
     cancel_timer(State),
     ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, State}.
+    {stop, changes_queue_died, State};
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+    #rep_state{doc_copiers = DocCopiers} = State,
+    case couch_util:get_value(Pid, DocCopiers) of
+    undefined ->
+        cancel_timer(State),
+        {stop, {unknown_process_died, Pid, Reason}, State};
+    CopierId ->
+        case Reason of
+        normal ->
+            {noreply, State};
+        _ ->
+            cancel_timer(State),
+            ?LOG_ERROR("DocCopier process ~p died with reason: ~p",
+                [CopierId, Reason]),
+            {stop, doc_copier_died, State}
+        end
+    end.
 
 
 handle_call(Msg, _From, State) ->
@@ -505,29 +548,36 @@ remove_missing(IdRevsSeqDict, [{MissingI
     end.
 
 
-spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
-    % Note, we could spawn many doc copy process here. Before that's possible
-    % the work_queue code needs to be modified to work with multiple
-    % dequeueing processes
-    spawn_link(fun() ->
-        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
-    end).
+spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue) ->
+    Count = ?l2i(couch_config:get("replicator", "copy_processes", "8")),
+    lists:map(
+        fun(CopierId) ->
+            Pid = spawn_link(fun() ->
+                doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue)
+            end),
+            {Pid, CopierId}
+        end,
+        lists:seq(1, Count)).
 
 
-doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
+doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
     case couch_work_queue:dequeue(MissingRevsQueue,1) of
     closed ->
-        Cp ! done;
+        ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
+        Cp ! {done, CopierId};
     {ok, [{doc_id, Id}]} ->
+        ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
         couch_api_wrap:open_doc(
             Source, Id, [], fun(R) -> doc_handler(R, Target, Cp) end),
-        doc_copy_loop(Cp, Source, Target, MissingRevsQueue);
+        doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
     {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
+        ?LOG_DEBUG("Doc copier ~p got {~p, ~p, ~p, ~p}",
+            [CopierId, Id, Revs, PossibleAncestors, Seq]),
         couch_api_wrap:open_doc_revs(
             Source, Id, Revs, [{atts_since, PossibleAncestors}],
             fun(R, _) -> doc_handler(R, Target, Cp) end, []),
         Cp ! {seq_changes_done, {Seq, length(Revs)}},
-        doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+        doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue)
     end.
 
 doc_handler({ok, Doc}, Target, Cp) ->
@@ -620,6 +670,18 @@ do_checkpoint(State) ->
     end.
 
 
+next_seq_before_gap(Seq, [], _IsSuccFun) ->
+    {Seq, []};
+
+next_seq_before_gap(Seq, [Next | NextSeqs] = AllSeqs , IsSuccFun) ->
+    case IsSuccFun(Seq, Next) of
+    false ->
+        {Seq, AllSeqs};
+    true ->
+        next_seq_before_gap(Next, NextSeqs, IsSuccFun)
+    end.
+
+
 commit_to_both(Source, Target) ->
     % commit the src async
     ParentPid = self(),