You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/02 02:53:15 UTC

svn commit: r1003724 - in /couchdb/branches/new_replicator/src/couchdb: couch_replicator_doc_copiers.erl couch_replicator_rev_finders.erl

Author: fdmanana
Date: Sat Oct  2 00:53:15 2010
New Revision: 1003724

URL: http://svn.apache.org/viewvc?rev=1003724&view=rev
Log:
New replicator: accumulate stats where possible to avoid flooding replication gen_servers with tons of messages.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003724&r1=1003723&r2=1003724&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sat Oct  2 00:53:15 2010
@@ -32,99 +32,121 @@ spawn_doc_copiers(Cp, Source, Target, Mi
         lists:seq(1, CopiersCount)).
 
 
+-record(doc_acc, {
+    docs = [],
+    seqs = [],
+    read = 0,
+    written = 0,
+    wfail = 0,
+    cp
+}).
+
 doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
     case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
     closed ->
         ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
         Cp ! {done, CopierId};
+
     {ok, [{doc_id, _} | _] = DocIds} ->
-        {BulkList, []} = lists:foldl(
+        DocAcc = lists:foldl(
             fun({doc_id, Id}, Acc) ->
                 ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
                 {ok, Acc2} = couch_api_wrap:open_doc_revs(
                     Source, Id, all, [],
-                    fun(R, A) -> doc_handler(R, nil, Target, Cp, A) end, Acc),
+                    fun(R, A) -> doc_handler(R, nil, Target, A) end, Acc),
                 Acc2
             end,
-            {[], []}, DocIds),
-        bulk_write_docs(lists:reverse(BulkList), [], Target, Cp),
+            #doc_acc{cp = Cp}, DocIds),
+        maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp),
+        #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target),
+        maybe_send_stat(W, #rep_stats.docs_written, Cp),
+        maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp),
         doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
+
     {ok, IdRevList} ->
-        {Source2, {BulkList, SeqList}} = lists:foldl(
+        {Source2, DocAcc} = lists:foldl(
             fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
                 ?LOG_DEBUG("Doc copier ~p got ~p", [CopierId, IdRev]),
                 SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
                 {ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
                     SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
-                    fun(R, A) -> doc_handler(R, Seq, Target, Cp, A) end,
+                    fun(R, A) -> doc_handler(R, Seq, Target, A) end,
                     BulkAcc),
                 {SrcDb2, BulkAcc2}
             end,
-            {Source, {[], []}}, IdRevList),
-        bulk_write_docs(
-            lists:reverse(BulkList),
-            lists:reverse(SeqList),
-            Target,
-            Cp),
+            {Source, #doc_acc{cp = Cp}}, IdRevList),
+        maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp),
+        #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target),
+        maybe_send_stat(W, #rep_stats.docs_written, Cp),
+        maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp),
         doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
     end.
 
 
-doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Cp, Acc) ->
-    Cp ! {add_stat, {#rep_stats.docs_read, 1}},
+doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Acc) ->
     update_bulk_doc_acc(Acc, Seq, Doc);
 
-doc_handler({ok, Doc}, Seq, Target, Cp, Acc) ->
-    Cp ! {add_stat, {#rep_stats.docs_read, 1}},
-    write_doc(Doc, Seq, Target, Cp),
-    Acc;
+doc_handler({ok, Doc}, Seq, Target, Acc) ->
+    write_doc(Doc, Seq, Target, Acc);
 
-doc_handler(_, _, _, _, Acc) ->
+doc_handler(_, _, _, Acc) ->
     Acc.
 
 
-update_bulk_doc_acc({DocAcc, SeqAcc}, nil, Doc) ->
-    {[Doc | DocAcc], SeqAcc};
-update_bulk_doc_acc({DocAcc, [{Seq, Count} | RestSeq]}, Seq, Doc) ->
-    {[Doc | DocAcc], [{Seq, Count + 1} | RestSeq]};
-update_bulk_doc_acc({DocAcc, SeqAcc}, Seq, Doc) ->
-    {[Doc | DocAcc], [{Seq, 1} | SeqAcc]}.
+update_bulk_doc_acc(#doc_acc{docs = Docs, read = Read} = Acc, nil, Doc) ->
+    Acc#doc_acc{
+        docs = [Doc | Docs],
+        read = Read + 1
+    };
+
+update_bulk_doc_acc(#doc_acc{seqs = [{Seq, Count} | Rest]} = Acc, Seq, Doc) ->
+    Acc#doc_acc{
+        docs = [Doc | Acc#doc_acc.docs],
+        seqs = [{Seq, Count + 1} | Rest],
+        read = Acc#doc_acc.read + 1
+    };
+
+update_bulk_doc_acc(#doc_acc{seqs = Seqs, read = Read} = Acc, Seq, Doc) ->
+    Acc#doc_acc{
+        docs = [Doc | Acc#doc_acc.docs],
+        seqs = [{Seq, 1} | Seqs],
+        read = Read + 1
+    }.
 
 
-write_doc(Doc, Seq, Db, Cp) ->
-    case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
+write_doc(Doc, Seq, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
+    Acc2 = case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
     {ok, _} ->
-        Cp ! {add_stat, {#rep_stats.docs_written, 1}};
+        Acc#doc_acc{written = W + 1, read = R + 1};
     {error, <<"unauthorized">>} ->
-        Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}},
         ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
-            [Doc#doc.id, couch_api_wrap:db_uri(Db)]);
+            [Doc#doc.id, couch_api_wrap:db_uri(Db)]),
+        Acc#doc_acc{wfail = F + 1, read = R + 1};
     _ ->
-        Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}}
+        Acc#doc_acc{wfail = F + 1, read = R + 1}
     end,
-    seqs_done([{Seq, 1}], Cp).
+    seqs_done([{Seq, 1}], Acc#doc_acc.cp),
+    Acc2.
 
 
-bulk_write_docs([], _, _, _) ->
-    ok;
-bulk_write_docs(Docs, Seqs, Db, Cp) ->
-    case couch_api_wrap:update_docs(
-        Db, Docs, [delay_commit], replicated_changes) of
-    {ok, []} ->
-        Cp ! {add_stat, {#rep_stats.docs_written, length(Docs)}};
-    {ok, Errors} ->
-        Cp ! {add_stat, {#rep_stats.doc_write_failures, length(Errors)}},
-        Cp ! {add_stat, {#rep_stats.docs_written, length(Docs) - length(Errors)}},
-        DbUri = couch_api_wrap:db_uri(Db),
-        lists:foreach(
-            fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
-                    ?LOG_ERROR("Replicator: unauthorized to write document"
-                        " ~s to ~s", [Id, DbUri]);
-                (_) ->
-                    ok
-            end, Errors)
-    end,
-    seqs_done(Seqs, Cp).
+bulk_write_docs(#doc_acc{docs = []} = Acc, _) ->
+    Acc;
+bulk_write_docs(#doc_acc{docs = Docs, seqs = Seqs, cp = Cp} = Acc, Db) ->
+    {ok, Errors} = couch_api_wrap:update_docs(
+        Db, Docs, [delay_commit], replicated_changes),
+    DbUri = couch_api_wrap:db_uri(Db),
+    lists:foreach(
+        fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
+                ?LOG_ERROR("Replicator: unauthorized to write document"
+                    " ~s to ~s", [Id, DbUri]);
+            (_) ->
+                ok
+        end, Errors),
+    seqs_done(Seqs, Cp),
+    Acc#doc_acc{
+        wfail = Acc#doc_acc.wfail + length(Errors),
+        written = Acc#doc_acc.written + length(Docs) - length(Errors)
+    }.
 
 
 seqs_done([], _) ->
@@ -133,3 +155,10 @@ seqs_done([{nil, _} | _], _) ->
     ok;
 seqs_done(SeqCounts, Cp) ->
     Cp ! {seq_changes_done, SeqCounts}.
+
+
+maybe_send_stat(0, _StatPos, _Cp) ->
+    ok;
+maybe_send_stat(Value, StatPos, Cp) ->
+    Cp ! {add_stat, {StatPos, Value}}.
+

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003724&r1=1003723&r2=1003724&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Sat Oct  2 00:53:15 2010
@@ -74,12 +74,14 @@ missing_revs_finder_loop(FinderId, Cp, T
                 {_Id, {Revs, Seq}} <- dict:to_list(NonMissingIdRevsSeqDict)]},
 
         % Expand out each docs and seq into it's own work item
-        lists:foreach(fun({Id, Revs, PAs}) ->
-            % PA means "possible ancestor"
-            Cp ! {add_stat, {#rep_stats.missing_found, length(Revs)}},
-            {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
-            ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq})
-            end, Missing),
+        MissingCount = lists:foldl(
+            fun({Id, Revs, PAs}, Count) ->
+                % PA means "possible ancestor"
+                {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+                ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq}),
+                Count + length(Revs)
+            end, 0, Missing),
+        Cp ! {add_stat, {#rep_stats.missing_found, MissingCount}},
         missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue)
     end.