You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/17 18:38:46 UTC

svn commit: r1023523 - in /couchdb/branches/new_replicator/src/couchdb: couch_replicator.erl couch_replicator_doc_copiers.erl couch_replicator_rev_finders.erl

Author: fdmanana
Date: Sun Oct 17 16:38:46 2010
New Revision: 1023523

URL: http://svn.apache.org/viewvc?rev=1023523&view=rev
Log:
New replicator: simplifyed code and reduced contention on the main gen_server.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1023523&r1=1023522&r2=1023523&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Sun Oct 17 16:38:46 2010
@@ -239,8 +239,8 @@ do_init(#rep{options = Options} = Rep) -
 
         % This starts the _changes reader process. It adds the changes from
         % the source db to the ChangesQueue.
-        ChangesReader = spawn_changes_reader(self(), StartSeq, Source,
-            ChangesQueue, Options),
+        ChangesReader = spawn_changes_reader(
+            StartSeq, Source, ChangesQueue, Options),
 
         % This starts the missing rev finders. They check the target for changes
         % in the ChangesQueue to see if they exist on the target or not. If not,
@@ -351,13 +351,13 @@ handle_cast(checkpoint, State) ->
     State2 = do_checkpoint(State),
     {noreply, State2#rep_state{timer = start_timer(State)}};
 
-handle_cast({seq_start, {Seq, NumChanges}}, State) ->
+handle_cast({seq_start, {LargestSeq, NumChanges}}, State) ->
     #rep_state{
-        seqs_in_progress = SeqsInProgress,
+        seqs_in_progress = SeqsTree,
         stats = #rep_stats{missing_checked = Mc} = Stats
     } = State,
     NewState = State#rep_state{
-        seqs_in_progress = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
+        seqs_in_progress = gb_trees:insert(LargestSeq, NumChanges, SeqsTree),
         stats = Stats#rep_stats{missing_checked = Mc + NumChanges}
     },
     {noreply, NewState};
@@ -496,12 +496,11 @@ init_state(Rep) ->
     State#rep_state{timer = start_timer(State)}.
 
 
-spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) ->
+spawn_changes_reader(StartSeq, Source, ChangesQueue, Options) ->
     spawn_link(
         fun()->
             couch_api_wrap:changes_since(Source, all_docs, StartSeq,
-                fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo) ->
-                    ok = gen_server:cast(Cp, {seq_start, {Seq, length(Revs)}}),
+                fun(DocInfo) ->
                     ok = couch_work_queue:queue(ChangesQueue, DocInfo)
                 end, Options),
             couch_work_queue:close(ChangesQueue)
@@ -659,32 +658,22 @@ has_session_id(SessionId, [{Props} | Res
     end.
 
 
-process_seq_changes_done(Changes, State) ->
+process_seq_changes_done({Seq, NumChangesDone}, State) ->
     #rep_state{
         seqs_in_progress = SeqsInProgress,
         next_through_seqs = DoneSeqs,
         current_through_seq = ThroughSeq
     } = State,
 
-    {SeqsInProgress2, LastSeq} = lists:foldl(
-        fun({Seq, ChangesDone}, {SeqTree, MaxSeqDone}) ->
-            Total = gb_trees:get(Seq, SeqTree),
-            case Total - ChangesDone of
-            0 ->
-                NewMaxSeqDone = case MaxSeqDone of
-                nil ->
-                    Seq;
-                _ ->
-                    lists:max([Seq, MaxSeqDone])
-                end,
-                {gb_trees:delete(Seq, SeqTree), NewMaxSeqDone};
-            NewTotal when NewTotal > 0 ->
-                {gb_trees:update(Seq, NewTotal, SeqTree), MaxSeqDone}
-            end
-        end,
-        {SeqsInProgress, nil}, Changes),
+    Total = gb_trees:get(Seq, SeqsInProgress),
+    SeqsInProgress2 = case Total - NumChangesDone of
+    0 ->
+        gb_trees:delete(Seq, SeqsInProgress);
+    NewTotal when NewTotal > 0 ->
+        gb_trees:update(Seq, NewTotal, SeqsInProgress)
+    end,
 
-    DoneSeqs2 = add_done_seq(LastSeq, DoneSeqs),
+    DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs),
     {NewThroughSeq, DoneSeqs3} =
         get_next_through_seq(ThroughSeq, SeqsInProgress2, DoneSeqs2),
 
@@ -695,12 +684,6 @@ process_seq_changes_done(Changes, State)
     }.
 
 
-add_done_seq(nil, DoneSeqs) ->
-    DoneSeqs;
-add_done_seq(Seq, DoneSeqs) ->
-    ordsets:add_element(Seq, DoneSeqs).
-
-
 get_next_through_seq(Current, InProgress, Done) ->
     case gb_trees:is_empty(InProgress) of
     true ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1023523&r1=1023522&r2=1023523&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sun Oct 17 16:38:46 2010
@@ -33,7 +33,6 @@ spawn_doc_copiers(Cp, Source, Target, Mi
 
 -record(doc_acc, {
     docs = [],
-    seqs = [],
     read = 0,
     written = 0,
     wfail = 0
@@ -51,29 +50,40 @@ doc_copy_loop(Cp, Source, Target, Missin
                 ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [self(), Id]),
                 {ok, Acc2} = couch_api_wrap:open_doc_revs(
                     Source, Id, all, [],
-                    fun(R, A) -> doc_handler(R, nil, Target, A) end, Acc),
+                    fun(R, A) -> doc_handler(R, Target, A) end, Acc),
                 Acc2
             end,
             #doc_acc{}, DocIds),
-        {Source, Acc};
+        {Source, Acc, {nil, 0}};
 
-    {ok, IdRevList} ->
-        lists:foldl(
+    {ok, [{_, FirstRevs, _, FirstSeq} | RestIdRevList] = IdRevList} ->
+        {LargestSeq, TotalChanges} = lists:foldl(
+            fun({_, Revs, _, Seq}, {Largest, Total}) when Seq > Largest ->
+                {Seq, Total + length(Revs)};
+            ({_, Revs, _, _}, {Largest, Total}) ->
+                {Largest, Total + length(Revs)}
+            end,
+            {FirstSeq, length(FirstRevs)}, RestIdRevList),
+        LastSeqDone = {LargestSeq, TotalChanges},
+        ok = gen_server:cast(Cp, {seq_start, LastSeqDone}),
+        {NewSource, Acc} = lists:foldl(
             fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
                 ?LOG_DEBUG("Doc copier ~p got ~p", [self(), IdRev]),
                 SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
                 {ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
                     SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
-                    fun(R, A) -> doc_handler(R, Seq, Target, A) end,
+                    fun(R, A) -> doc_handler(R, Target, A) end,
                     BulkAcc),
                 {SrcDb2, BulkAcc2}
             end,
-            {Source, #doc_acc{}}, IdRevList)
+            {Source, #doc_acc{}}, IdRevList),
+        {NewSource, Acc, LastSeqDone}
     end,
+
     case Result of
-    {Source2, DocAcc} ->
-        #doc_acc{seqs = SeqsDone} = DocAcc2 = bulk_write_docs(DocAcc, Target),
-        seqs_done(SeqsDone, Cp),
+    {Source2, DocAcc, LargestSeqDone} ->
+        DocAcc2 = bulk_write_docs(DocAcc, Target),
+        seq_done(LargestSeqDone, Cp),
         send_stats(DocAcc2, Cp),
         doc_copy_loop(Cp, Source2, Target, MissingRevsQueue);
     stop ->
@@ -81,53 +91,30 @@ doc_copy_loop(Cp, Source, Target, Missin
     end.
 
 
-doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Acc) ->
-    update_bulk_doc_acc(Acc, Seq, Doc);
+doc_handler({ok, #doc{atts = []} = Doc}, _Target, Acc) ->
+    update_bulk_doc_acc(Acc, Doc);
 
-doc_handler({ok, Doc}, Seq, Target, Acc) ->
-    write_doc(Doc, Seq, Target, Acc);
+doc_handler({ok, Doc}, Target, Acc) ->
+    write_doc(Doc, Target, Acc);
 
-doc_handler(_, _, _, Acc) ->
+doc_handler(_, _, Acc) ->
     Acc.
 
 
-update_bulk_doc_acc(#doc_acc{docs = Docs, read = Read} = Acc, nil, Doc) ->
-    Acc#doc_acc{
-        docs = [Doc | Docs],
-        read = Read + 1
-    };
-
-update_bulk_doc_acc(#doc_acc{seqs = [{Seq, Count} | Rest]} = Acc, Seq, Doc) ->
-    Acc#doc_acc{
-        docs = [Doc | Acc#doc_acc.docs],
-        seqs = [{Seq, Count + 1} | Rest],
-        read = Acc#doc_acc.read + 1
-    };
-
-update_bulk_doc_acc(#doc_acc{seqs = Seqs, read = Read} = Acc, Seq, Doc) ->
-    Acc#doc_acc{
-        docs = [Doc | Acc#doc_acc.docs],
-        seqs = [{Seq, 1} | Seqs],
-        read = Read + 1
-    }.
+update_bulk_doc_acc(#doc_acc{docs = DocAcc, read = Read} = Acc, Doc) ->
+    Acc#doc_acc{docs = [Doc | DocAcc], read = Read + 1}.
 
 
-write_doc(Doc, Seq, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
-    SeqsDone = case Acc#doc_acc.seqs of
-    [{Seq, Count} | RestSeqs] ->
-        [{Seq, Count + 1} | RestSeqs];
-    RestSeqs ->
-        [{Seq, 1} | RestSeqs]
-    end,
+write_doc(Doc, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
     case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
     {ok, _} ->
-        Acc#doc_acc{written = W + 1, read = R + 1, seqs = SeqsDone};
+        Acc#doc_acc{written = W + 1, read = R + 1};
     {error, <<"unauthorized">>} ->
         ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
             [Doc#doc.id, couch_api_wrap:db_uri(Db)]),
-        Acc#doc_acc{wfail = F + 1, read = R + 1, seqs = SeqsDone};
+        Acc#doc_acc{wfail = F + 1, read = R + 1};
     _ ->
-        Acc#doc_acc{wfail = F + 1, read = R + 1, seqs = SeqsDone}
+        Acc#doc_acc{wfail = F + 1, read = R + 1}
     end.
 
 
@@ -150,12 +137,10 @@ bulk_write_docs(#doc_acc{docs = Docs, wr
     }.
 
 
-seqs_done([], _) ->
-    ok;
-seqs_done([{nil, _} | _], _) ->
+seq_done({nil, _}, _Cp) ->
     ok;
-seqs_done(SeqCounts, Cp) ->
-    ok = gen_server:cast(Cp, {seq_changes_done, SeqCounts}).
+seq_done(SeqDone, Cp) ->
+    ok = gen_server:cast(Cp, {seq_changes_done, SeqDone}).
 
 
 send_stats(#doc_acc{read = R, written = W, wfail = Wf}, Cp) ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1023523&r1=1023522&r2=1023523&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Sun Oct 17 16:38:46 2010
@@ -61,15 +61,9 @@ missing_revs_finder_loop(Cp, Target, Cha
         % incremental attachment replication, so the source only needs to send
         % attachments modified since the common ancestor on target.
 
-        % Signal to the checkpointer any that are already on the target are
-        % now complete.
         IdRevsSeqDict = dict:from_list(
             [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
                     #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]),
-        NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
-        % signal the completion of these that aren't missing
-        report_non_missing(NonMissingIdRevsSeqDict, Cp),
-
         % Expand out each docs and seq into it's own work item
         MissingCount = lists:foldl(
             fun({Id, Revs, PAs}, Count) ->
@@ -83,33 +77,7 @@ missing_revs_finder_loop(Cp, Target, Cha
     end.
 
 
-report_non_missing(RevsDict, Cp) ->
-    case dict:size(RevsDict) of
-    0 ->
-        ok;
-    N when N > 0 ->
-        SeqsDone = [{Seq, length(Revs)} ||
-            {_Id, {Revs, Seq}} <- dict:to_list(RevsDict)],
-        ok = gen_server:cast(Cp, {seq_changes_done, SeqsDone})
-    end.
-
-
 send_missing_found(0, _Cp) ->
     ok;
 send_missing_found(Value, Cp) ->
     ok = gen_server:cast(Cp, {add_stats, #rep_stats{missing_found = Value}}).
-
-
-remove_missing(IdRevsSeqDict, []) ->
-    IdRevsSeqDict;
-
-remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _} | Rest]) ->
-    {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
-    case AllChangedRevs -- MissingRevs of
-    [] ->
-        remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
-    NotMissingRevs ->
-        IdRevsSeqDict2 =
-                dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
-        remove_missing(IdRevsSeqDict2, Rest)
-    end.