You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/02 21:03:50 UTC

svn commit: r1003865 - /couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl

Author: fdmanana
Date: Sat Oct  2 19:03:50 2010
New Revision: 1003865

URL: http://svn.apache.org/viewvc?rev=1003865&view=rev
Log:
New replicator: more efficient way of determining the next current through sequence number of the source DB.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1003865&r1=1003864&r2=1003865&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Sat Oct  2 19:03:50 2010
@@ -43,8 +43,7 @@
     checkpoint_history,
     start_seq,
     current_through_seq,
-    next_through_seqs = ordsets:new(),
-    is_successor_seq,
+    next_through_seqs = [],
     committed_seq,
     source_log,
     target_log,
@@ -58,7 +57,7 @@
     missing_rev_finders,
     doc_copiers,
     finished_doc_copiers = 0,
-    seqs_in_progress = gb_trees:from_orddict([]),
+    seqs_in_progress = gb_trees:empty(),
     stats = #rep_stats{}
     }).
 
@@ -266,9 +265,7 @@ do_init(#rep{options = Options} = Rep) -
             changes_queue = ChangesQueue,
             changes_reader = ChangesReader,
             missing_rev_finders = MissingRevFinders,
-            doc_copiers = DocCopiers,
-            is_successor_seq = get_value(is_successor_seq, Options,
-                fun(Seq, NextSeq) -> (Seq + 1) =:= NextSeq end)
+            doc_copiers = DocCopiers
         }
     }.
 
@@ -279,8 +276,7 @@ handle_info({seq_start, {Seq, NumChanges
     {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
 
 handle_info({seq_changes_done, Changes}, State) ->
-    NewState = lists:foldl(fun process_seq_changes_done/2, State, Changes),
-    {noreply, NewState};
+    {noreply, process_seq_changes_done(Changes, State)};
 
 handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
     Stat = element(StatPos, Stats),
@@ -306,7 +302,9 @@ handle_info({done, _CopierId}, State) ->
         _ ->
             lists:max([Seq, lists:last(DoneSeqs)])
         end,
-        State2 = do_checkpoint(State1#rep_state{current_through_seq = LastSeq}),
+        State2 = do_checkpoint(State1#rep_state{
+            current_through_seq = LastSeq,
+            next_through_seqs = []}),
         cancel_timer(State2),
         {stop, normal, State2};
     _ ->
@@ -583,18 +581,6 @@ do_checkpoint(State) ->
     end.
 
 
-next_seq_before_gap(Seq, [], _IsSuccFun) ->
-    {Seq, []};
-
-next_seq_before_gap(Seq, [Next | NextSeqs] = AllSeqs , IsSuccFun) ->
-    case IsSuccFun(Seq, Next) of
-    false ->
-        {Seq, AllSeqs};
-    true ->
-        next_seq_before_gap(Next, NextSeqs, IsSuccFun)
-    end.
-
-
 commit_to_both(Source, Target) ->
     % commit the src async
     ParentPid = self(),
@@ -677,39 +663,72 @@ has_session_id(SessionId, [{Props} | Res
     end.
 
 
-process_seq_changes_done({Seq, NumChangesDone}, State) ->
+process_seq_changes_done(Changes, State) ->
     #rep_state{
         seqs_in_progress = SeqsInProgress,
         next_through_seqs = DoneSeqs,
-        is_successor_seq = IsSuccFun
+        current_through_seq = ThroughSeq
     } = State,
-    % Decrement the # changes for this seq by NumChangesDone.
-    TotalChanges = gb_trees:get(Seq, SeqsInProgress),
-    case TotalChanges - NumChangesDone of
-    0 ->
-        % This seq is completely processed. Check to see if it was the
-        % smallest seq in progess. If so, we've made progress that can
-        % be checkpointed.
-        State2 = case gb_trees:smallest(SeqsInProgress) of
-        {Seq, _} ->
-            {CheckpointSeq, DoneSeqs2} = next_seq_before_gap(
-                Seq, DoneSeqs, IsSuccFun),
-            State#rep_state{
-                current_through_seq = CheckpointSeq,
-                next_through_seqs = DoneSeqs2
-            };
-        _ ->
-            DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
-            State#rep_state{next_through_seqs = DoneSeqs2}
+
+    {SeqsInProgress2, LastSeq} = lists:foldl(
+        fun({Seq, ChangesDone}, {SeqTree, MaxSeqDone}) ->
+            Total = gb_trees:get(Seq, SeqTree),
+            case Total - ChangesDone of
+            0 ->
+                NewMaxSeqDone = case MaxSeqDone of
+                nil ->
+                    Seq;
+                _ ->
+                    lists:max([Seq, MaxSeqDone])
+                end,
+                {gb_trees:delete(Seq, SeqTree), NewMaxSeqDone};
+            NewTotal when NewTotal > 0 ->
+                {gb_trees:update(Seq, NewTotal, SeqTree), MaxSeqDone}
+            end
         end,
-        State2#rep_state{
-            seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
-        };
-    NewTotalChanges when NewTotalChanges > 0 ->
-        % There are still some changes that need work done.
-        % Put the new count back.
-        State#rep_state{
-            seqs_in_progress =
-                gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
-        }
+        {SeqsInProgress, nil}, Changes),
+
+    DoneSeqs2 = add_done_seq(LastSeq, DoneSeqs),
+    {NewThroughSeq, DoneSeqs3} =
+        get_next_through_seq(ThroughSeq, SeqsInProgress2, DoneSeqs2),
+
+    State#rep_state{
+        seqs_in_progress = SeqsInProgress2,
+        next_through_seqs = DoneSeqs3,
+        current_through_seq = NewThroughSeq
+    }.
+
+
+add_done_seq(nil, DoneSeqs) ->
+    DoneSeqs;
+add_done_seq(Seq, DoneSeqs) ->
+    ordsets:add_element(Seq, DoneSeqs).
+
+
+get_next_through_seq(Current, InProgress, Done) ->
+    case gb_trees:is_empty(InProgress) of
+    true ->
+        case Done of
+        [] ->
+            {Current, Done};
+        _ ->
+            {lists:last(Done), []}
+        end;
+    false ->
+        {SmallestInProgress, _} = gb_trees:smallest(InProgress),
+        get_next_through_seq(Current, SmallestInProgress, Done, [])
     end.
+
+
+get_next_through_seq(Current, _SmallestInProgress, [], NewDone) ->
+    {Current, NewDone};
+get_next_through_seq(Current, SmallestInProgress, Done, NewDone) ->
+    LargestDone = lists:last(Done),
+    case LargestDone =< SmallestInProgress of
+    true ->
+        {LargestDone, NewDone};
+    false ->
+        get_next_through_seq(Current, SmallestInProgress,
+            ordsets:del_element(LargestDone, NewDone), [LargestDone | NewDone])
+    end.
+