You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/02 21:03:50 UTC
svn commit: r1003865 -
/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
Author: fdmanana
Date: Sat Oct 2 19:03:50 2010
New Revision: 1003865
URL: http://svn.apache.org/viewvc?rev=1003865&view=rev
Log:
New replicator: more efficient way of determining the next current through sequence number of the source DB.
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1003865&r1=1003864&r2=1003865&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Sat Oct 2 19:03:50 2010
@@ -43,8 +43,7 @@
checkpoint_history,
start_seq,
current_through_seq,
- next_through_seqs = ordsets:new(),
- is_successor_seq,
+ next_through_seqs = [],
committed_seq,
source_log,
target_log,
@@ -58,7 +57,7 @@
missing_rev_finders,
doc_copiers,
finished_doc_copiers = 0,
- seqs_in_progress = gb_trees:from_orddict([]),
+ seqs_in_progress = gb_trees:empty(),
stats = #rep_stats{}
}).
@@ -266,9 +265,7 @@ do_init(#rep{options = Options} = Rep) -
changes_queue = ChangesQueue,
changes_reader = ChangesReader,
missing_rev_finders = MissingRevFinders,
- doc_copiers = DocCopiers,
- is_successor_seq = get_value(is_successor_seq, Options,
- fun(Seq, NextSeq) -> (Seq + 1) =:= NextSeq end)
+ doc_copiers = DocCopiers
}
}.
@@ -279,8 +276,7 @@ handle_info({seq_start, {Seq, NumChanges
{noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
handle_info({seq_changes_done, Changes}, State) ->
- NewState = lists:foldl(fun process_seq_changes_done/2, State, Changes),
- {noreply, NewState};
+ {noreply, process_seq_changes_done(Changes, State)};
handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
Stat = element(StatPos, Stats),
@@ -306,7 +302,9 @@ handle_info({done, _CopierId}, State) ->
_ ->
lists:max([Seq, lists:last(DoneSeqs)])
end,
- State2 = do_checkpoint(State1#rep_state{current_through_seq = LastSeq}),
+ State2 = do_checkpoint(State1#rep_state{
+ current_through_seq = LastSeq,
+ next_through_seqs = []}),
cancel_timer(State2),
{stop, normal, State2};
_ ->
@@ -583,18 +581,6 @@ do_checkpoint(State) ->
end.
-next_seq_before_gap(Seq, [], _IsSuccFun) ->
- {Seq, []};
-
-next_seq_before_gap(Seq, [Next | NextSeqs] = AllSeqs , IsSuccFun) ->
- case IsSuccFun(Seq, Next) of
- false ->
- {Seq, AllSeqs};
- true ->
- next_seq_before_gap(Next, NextSeqs, IsSuccFun)
- end.
-
-
commit_to_both(Source, Target) ->
% commit the src async
ParentPid = self(),
@@ -677,39 +663,72 @@ has_session_id(SessionId, [{Props} | Res
end.
-process_seq_changes_done({Seq, NumChangesDone}, State) ->
+process_seq_changes_done(Changes, State) ->
#rep_state{
seqs_in_progress = SeqsInProgress,
next_through_seqs = DoneSeqs,
- is_successor_seq = IsSuccFun
+ current_through_seq = ThroughSeq
} = State,
- % Decrement the # changes for this seq by NumChangesDone.
- TotalChanges = gb_trees:get(Seq, SeqsInProgress),
- case TotalChanges - NumChangesDone of
- 0 ->
- % This seq is completely processed. Check to see if it was the
- % smallest seq in progess. If so, we've made progress that can
- % be checkpointed.
- State2 = case gb_trees:smallest(SeqsInProgress) of
- {Seq, _} ->
- {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
- Seq, DoneSeqs, IsSuccFun),
- State#rep_state{
- current_through_seq = CheckpointSeq,
- next_through_seqs = DoneSeqs2
- };
- _ ->
- DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
- State#rep_state{next_through_seqs = DoneSeqs2}
+
+ {SeqsInProgress2, LastSeq} = lists:foldl(
+ fun({Seq, ChangesDone}, {SeqTree, MaxSeqDone}) ->
+ Total = gb_trees:get(Seq, SeqTree),
+ case Total - ChangesDone of
+ 0 ->
+ NewMaxSeqDone = case MaxSeqDone of
+ nil ->
+ Seq;
+ _ ->
+ lists:max([Seq, MaxSeqDone])
+ end,
+ {gb_trees:delete(Seq, SeqTree), NewMaxSeqDone};
+ NewTotal when NewTotal > 0 ->
+ {gb_trees:update(Seq, NewTotal, SeqTree), MaxSeqDone}
+ end
end,
- State2#rep_state{
- seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
- };
- NewTotalChanges when NewTotalChanges > 0 ->
- % There are still some changes that need work done.
- % Put the new count back.
- State#rep_state{
- seqs_in_progress =
- gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
- }
+ {SeqsInProgress, nil}, Changes),
+
+ DoneSeqs2 = add_done_seq(LastSeq, DoneSeqs),
+ {NewThroughSeq, DoneSeqs3} =
+ get_next_through_seq(ThroughSeq, SeqsInProgress2, DoneSeqs2),
+
+ State#rep_state{
+ seqs_in_progress = SeqsInProgress2,
+ next_through_seqs = DoneSeqs3,
+ current_through_seq = NewThroughSeq
+ }.
+
+
+add_done_seq(nil, DoneSeqs) ->
+ DoneSeqs;
+add_done_seq(Seq, DoneSeqs) ->
+ ordsets:add_element(Seq, DoneSeqs).
+
+
+get_next_through_seq(Current, InProgress, Done) ->
+ case gb_trees:is_empty(InProgress) of
+ true ->
+ case Done of
+ [] ->
+ {Current, Done};
+ _ ->
+ {lists:last(Done), []}
+ end;
+ false ->
+ {SmallestInProgress, _} = gb_trees:smallest(InProgress),
+ get_next_through_seq(Current, SmallestInProgress, Done, [])
end.
+
+
+get_next_through_seq(Current, _SmallestInProgress, [], NewDone) ->
+ {Current, NewDone};
+get_next_through_seq(Current, SmallestInProgress, Done, NewDone) ->
+ LargestDone = lists:last(Done),
+ case LargestDone =< SmallestInProgress of
+ true ->
+ {LargestDone, NewDone};
+ false ->
+ get_next_through_seq(Current, SmallestInProgress,
+ ordsets:del_element(LargestDone, NewDone), [LargestDone | NewDone])
+ end.
+