You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/08/30 12:15:46 UTC
svn commit: r990757 - in /couchdb/branches/new_replicator:
etc/couchdb/default.ini.tpl.in src/couchdb/couch_replicate.erl
Author: fdmanana
Date: Mon Aug 30 10:15:46 2010
New Revision: 990757
URL: http://svn.apache.org/viewvc?rev=990757&view=rev
Log:
New replicator: first version using multiple workers (doc copier processes).
Modified:
couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
Modified: couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in?rev=990757&r1=990756&r2=990757&view=diff
==============================================================================
--- couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/branches/new_replicator/etc/couchdb/default.ini.tpl.in Mon Aug 30 10:15:46 2010
@@ -116,5 +116,6 @@ compression_level = 8 ; from 1 (lowest,
compressible_types = text/*, application/javascript, application/json, application/xml
[replicator]
+copy_processes = 8
max_http_sessions = 10
max_http_pipeline_size = 10
\ No newline at end of file
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=990757&r1=990756&r2=990757&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Mon Aug 30 10:15:46 2010
@@ -47,6 +47,8 @@
checkpoint_history,
start_seq,
current_through_seq,
+ next_through_seqs = ordsets:new(),
+ is_successor_seq,
committed_seq,
source_log,
target_log,
@@ -58,7 +60,8 @@
changes_queue,
changes_reader,
missing_revs_finder,
- doc_copier,
+ doc_copiers,
+ finished_doc_copiers = 0,
seqs_in_progress = gb_trees:from_orddict([]),
stats = #stats{}
}).
@@ -178,7 +181,7 @@ do_init([RepId, Src, Tgt, Options, UserC
} = State = init_state(RepId, Src, Tgt, Options, UserCtx),
{ok, MissingRevsQueue} = couch_work_queue:new(
- [{max_size, 100000}, {max_items, 500}]),
+ [{max_size, 100000}, {max_items, 500}, {multi_workers, true}]),
case couch_util:get_value(doc_ids, Options) of
undefined ->
@@ -209,16 +212,18 @@ do_init([RepId, Src, Tgt, Options, UserC
end
end,
- % This starts the doc copy process. It fetches documents from the
- % MissingRevsQueue and copies them from the source to the target database.
- DocCopier = spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
+ % This starts the doc copy processes. They fetch documents from the
+ % MissingRevsQueue and copy them from the source to the target database.
+ DocCopiers = spawn_doc_copiers(self(), Source, Target, MissingRevsQueue),
{ok, State#rep_state{
missing_revs_queue = MissingRevsQueue,
changes_queue = ChangesQueue,
changes_reader = ChangesReader,
missing_revs_finder = MissingRevsFinder,
- doc_copier = DocCopier
+ doc_copiers = DocCopiers,
+ is_successor_seq = couch_util:get_value(is_successor_seq, Options,
+ fun(Seq, NextSeq) -> (Seq + 1) =:= NextSeq end)
}
}.
@@ -229,7 +234,11 @@ handle_info({seq_start, {Seq, NumChanges
{noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) ->
- #rep_state{seqs_in_progress = SeqsInProgress} = State,
+ #rep_state{
+ seqs_in_progress = SeqsInProgress,
+ next_through_seqs = DoneSeqs,
+ is_successor_seq = IsSuccFun
+ } = State,
% Decrement the # changes for this seq by NumChangesDone.
TotalChanges = gb_trees:get(Seq, SeqsInProgress),
NewState = case TotalChanges - NumChangesDone of
@@ -239,9 +248,15 @@ handle_info({seq_changes_done, {Seq, Num
% be checkpointed.
State2 = case gb_trees:smallest(SeqsInProgress) of
{Seq, _} ->
- State#rep_state{current_through_seq = Seq};
+ {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
+ Seq, DoneSeqs, IsSuccFun),
+ State#rep_state{
+ current_through_seq = CheckpointSeq,
+ next_through_seqs = DoneSeqs2
+ };
_ ->
- State
+ DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
+ State#rep_state{next_through_seqs = DoneSeqs2}
end,
State2#rep_state{
seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
@@ -261,13 +276,31 @@ handle_info({add_stat, {StatPos, Val}},
NewStats = setelement(StatPos, Stats, Stat + Val),
{noreply, State#rep_state{stats = NewStats}};
-handle_info(done, #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
- % This means all the worker processes have completed their work.
- % Assert that all the seqs have been processed
- 0 = gb_trees:size(SeqsInProgress),
- NewState = do_checkpoint(State),
- cancel_timer(NewState),
- {stop, normal, NewState};
+handle_info({done, _CopierId}, State) ->
+ #rep_state{
+ finished_doc_copiers = Finished,
+ doc_copiers = DocCopiers,
+ next_through_seqs = DoneSeqs,
+ current_through_seq = Seq
+ } = State,
+ State1 = State#rep_state{finished_doc_copiers = Finished + 1},
+ case length(DocCopiers) - 1 of
+ Finished ->
+ % This means all the worker processes have completed their work.
+ % Assert that all the seqs have been processed.
+ 0 = gb_trees:size(State#rep_state.seqs_in_progress),
+ LastSeq = case DoneSeqs of
+ [] ->
+ Seq;
+ _ ->
+ lists:max([Seq, lists:last(DoneSeqs)])
+ end,
+ State2 = do_checkpoint(State1#rep_state{current_through_seq = LastSeq}),
+ cancel_timer(State2),
+ {stop, normal, State2};
+ _ ->
+ {noreply, State1}
+ end;
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
@@ -285,14 +318,6 @@ handle_info({'EXIT', Pid, Reason}, #rep_
?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]),
{stop, missing_revs_finder_died, St};
-handle_info({'EXIT', Pid, normal}, #rep_state{doc_copier=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{doc_copier=Pid} = State) ->
- cancel_timer(State),
- ?LOG_ERROR("DocCopier process died with reason: ~p", [Reason]),
- {stop, doc_copier_died, State};
-
handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
{noreply, St};
@@ -307,7 +332,25 @@ handle_info({'EXIT', Pid, normal}, #rep_
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
cancel_timer(State),
?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, State}.
+ {stop, changes_queue_died, State};
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ #rep_state{doc_copiers = DocCopiers} = State,
+ case couch_util:get_value(Pid, DocCopiers) of
+ undefined ->
+ cancel_timer(State),
+ {stop, {unknown_process_died, Pid, Reason}, State};
+ CopierId ->
+ case Reason of
+ normal ->
+ {noreply, State};
+ _ ->
+ cancel_timer(State),
+ ?LOG_ERROR("DocCopier process ~p died with reason: ~p",
+ [CopierId, Reason]),
+ {stop, doc_copier_died, State}
+ end
+ end.
handle_call(Msg, _From, State) ->
@@ -505,29 +548,36 @@ remove_missing(IdRevsSeqDict, [{MissingI
end.
-spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
- % Note, we could spawn many doc copy process here. Before that's possible
- % the work_queue code needs to be modified to work with multiple
- % dequeueing processes
- spawn_link(fun() ->
- doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
- end).
+spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue) ->
+ Count = ?l2i(couch_config:get("replicator", "copy_processes", "8")),
+ lists:map(
+ fun(CopierId) ->
+ Pid = spawn_link(fun() ->
+ doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue)
+ end),
+ {Pid, CopierId}
+ end,
+ lists:seq(1, Count)).
-doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
+doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
case couch_work_queue:dequeue(MissingRevsQueue,1) of
closed ->
- Cp ! done;
+ ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
+ Cp ! {done, CopierId};
{ok, [{doc_id, Id}]} ->
+ ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
couch_api_wrap:open_doc(
Source, Id, [], fun(R) -> doc_handler(R, Target, Cp) end),
- doc_copy_loop(Cp, Source, Target, MissingRevsQueue);
+ doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
{ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
+ ?LOG_DEBUG("Doc copier ~p got {~p, ~p, ~p, ~p}",
+ [CopierId, Id, Revs, PossibleAncestors, Seq]),
couch_api_wrap:open_doc_revs(
Source, Id, Revs, [{atts_since, PossibleAncestors}],
fun(R, _) -> doc_handler(R, Target, Cp) end, []),
Cp ! {seq_changes_done, {Seq, length(Revs)}},
- doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+ doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue)
end.
doc_handler({ok, Doc}, Target, Cp) ->
@@ -620,6 +670,18 @@ do_checkpoint(State) ->
end.
+next_seq_before_gap(Seq, [], _IsSuccFun) ->
+ {Seq, []};
+
+next_seq_before_gap(Seq, [Next | NextSeqs] = AllSeqs , IsSuccFun) ->
+ case IsSuccFun(Seq, Next) of
+ false ->
+ {Seq, AllSeqs};
+ true ->
+ next_seq_before_gap(Next, NextSeqs, IsSuccFun)
+ end.
+
+
commit_to_both(Source, Target) ->
% commit the src async
ParentPid = self(),