You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/09/22 01:14:41 UTC

svn commit: r999685 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_replicate.erl

Author: fdmanana
Date: Tue Sep 21 23:14:41 2010
New Revision: 999685

URL: http://svn.apache.org/viewvc?rev=999685&view=rev
Log:
New replicator: update documents in batches for better performance (still needs some more work).

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=999685&r1=999684&r2=999685&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Sep 21 23:14:41 2010
@@ -38,11 +38,13 @@
     db_close/1,
     get_db_info/1,
     update_doc/3,
+    update_doc/4,
+    update_docs/3,
+    update_docs/4,
     ensure_full_commit/1,
     get_missing_revs/2,
     open_doc/3,
     open_doc_revs/6,
-    update_doc/4,
     changes_since/5,
     db_uri/1
     ]).
@@ -122,10 +124,6 @@ get_db_info(Db) ->
     {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
 
 
-update_doc(Db, Doc, Options) ->
-    update_doc(Db,Doc,Options,interactive_edit).
-
-
 ensure_full_commit(#httpdb{} = Db) ->
     send_req(
         Db,
@@ -216,6 +214,8 @@ maybe_reopen_db(#db{update_seq = UpSeq, 
         Db
     end.
 
+update_doc(Db, Doc, Options) ->
+    update_doc(Db, Doc, Options, interactive_edit).
 
 update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
     QArgs = case Type of
@@ -274,6 +274,35 @@ update_doc(Db, Doc, Options, Type) ->
         {error, <<"unauthorized">>}
     end.
 
+
+update_docs(Db, DocList, Options) ->
+    update_docs(Db, DocList, Options, interactive_edit).
+
+update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
+    DocList1 = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- DocList],
+    Body = case UpdateType of
+    replicated_changes ->
+        {[{new_edits, false}, {docs, DocList1}]};
+    interactive_edit ->
+        {[{docs, DocList1}]}
+    end,
+    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+    send_req(
+        HttpDb,
+        [{method, post}, {path, "_bulk_docs"}, {body, ?JSON_ENCODE(Body)},
+            {headers, [
+                {"X-Couch-Full-Commit", FullCommit},
+                {"Content-Type", "application/json"} ]}],
+        fun(201, _, Results) when is_list(Results) ->
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (417, _, Results) when is_list(Results) ->
+                {ok, bulk_results_to_errors(DocList, Results, remote)}
+        end);
+update_docs(Db, DocList, Options, UpdateType) ->
+    Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
+    {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+
+
 changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Options) ->
     QArgs = changes_q_args(
         [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}],
@@ -510,3 +539,38 @@ encode_doc_id(<<"_local/", RestId/binary
     "_local/" ++ url_encode(RestId);
 encode_doc_id(DocId) ->
     url_encode(DocId).
+
+
+bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
+    lists:reverse(lists:foldl(
+        fun({_, {ok, _}}, Acc) ->
+            Acc;
+        ({#doc{id = Id}, Error}, Acc) ->
+            {_, Error, _Reason} = couch_httpd:error_info(Error),
+            [ {[{<<"id">>, Id}, {<<"error">>, Error}]} | Acc ]
+        end,
+        [], lists:zip(Docs, Results)));
+
+bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
+    bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
+
+bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
+    lists:map(
+        fun({{Id, _Rev}, Err}) ->
+            {_, Error, _Reason} = couch_httpd:error_info(Err),
+            {[{<<"id">>, Id}, {<<"error">>, Error}]}
+        end,
+        Results);
+
+bulk_results_to_errors(_Docs, Results, remote) ->
+    lists:reverse(lists:foldl(
+        fun({Props}, Acc) ->
+            case get_value(<<"error">>, Props, get_value(error, Props)) of
+            undefined ->
+                Acc;
+            Error ->
+                Id = get_value(<<"id">>, Props, get_value(id, Props)),
+                [ {[{<<"id">>, Id}, {<<"error">>, Error}]} | Acc ]
+            end
+        end,
+        [], Results)).

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=999685&r1=999684&r2=999685&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Tue Sep 21 23:14:41 2010
@@ -31,6 +31,7 @@
 % Can't be greater than the maximum number of child restarts specified
 % in couch_rep_sup.erl.
 -define(MAX_RESTARTS, 3).
+-define(DOC_BATCH_SIZE, 50).
 
 
 -record(stats, {
@@ -600,7 +601,7 @@ remove_missing(IdRevsSeqDict, [{MissingI
 
 
 spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue) ->
-    Count = ?l2i(couch_config:get("replicator", "copy_processes", "8")),
+    Count = ?l2i(couch_config:get("replicator", "copy_processes", "10")),
     lists:map(
         fun(CopierId) ->
             Pid = spawn_link(fun() ->
@@ -612,44 +613,104 @@ spawn_doc_copiers(Cp, Source, Target, Mi
 
 
 doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
-    case couch_work_queue:dequeue(MissingRevsQueue,1) of
+    case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
     closed ->
         ?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
         Cp ! {done, CopierId};
-    {ok, [{doc_id, Id}]} ->
-        ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
-        couch_api_wrap:open_doc_revs(
-            Source, Id, all, [],
-            fun(R, _) -> doc_handler(R, Target, Cp) end, []),
+    {ok, [{doc_id, _} | _] = DocIds} ->
+        {BulkList, []} = lists:foldl(
+            fun({doc_id, Id}, Acc) ->
+                ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
+                {ok, Acc2} = couch_api_wrap:open_doc_revs(
+                    Source, Id, all, [],
+                    fun(R, A) -> doc_handler(R, nil, Target, Cp, A) end, Acc),
+                Acc2
+            end,
+            {[], []}, DocIds),
+        bulk_write_docs(lists:reverse(BulkList), [], Target, Cp),
         doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
-    {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
-        ?LOG_DEBUG("Doc copier ~p got {~p, ~p, ~p, ~p}",
-            [CopierId, Id, Revs, PossibleAncestors, Seq]),
-        Source2 = couch_api_wrap:maybe_reopen_db(Source, Seq),
-        couch_api_wrap:open_doc_revs(
-            Source2, Id, Revs, [{atts_since, PossibleAncestors}],
-            fun(R, _) -> doc_handler(R, Target, Cp) end, []),
-        Cp ! {seq_changes_done, {Seq, length(Revs)}},
+    {ok, IdRevList} ->
+        {Source2, {BulkList, SeqList}} = lists:foldl(
+            fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
+                ?LOG_DEBUG("Doc copier ~p got ~p", [CopierId, IdRev]),
+                SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
+                {ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
+                    SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
+                    fun(R, A) -> doc_handler(R, Seq, Target, Cp, A) end,
+                    BulkAcc),
+                {SrcDb2, BulkAcc2}
+            end,
+            {Source, {[], []}}, IdRevList),
+        bulk_write_docs(
+            lists:reverse(BulkList),
+            lists:reverse(SeqList),
+            Target,
+            Cp),
         doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
     end.
 
-doc_handler({ok, Doc}, Target, Cp) ->
+
+doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Cp, Acc) ->
+    Cp ! {add_stat, {#stats.docs_read, 1}},
+    update_bulk_doc_acc(Acc, Seq, Doc);
+
+doc_handler({ok, Doc}, Seq, Target, Cp, Acc) ->
     Cp ! {add_stat, {#stats.docs_read, 1}},
-    case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+    write_doc(Doc, Seq, Target, Cp),
+    Acc;
+
+doc_handler(_, _, _, _, Acc) ->
+    Acc.
+
+
+update_bulk_doc_acc({DocAcc, SeqAcc}, nil, Doc) ->
+    {[Doc | DocAcc], SeqAcc};
+update_bulk_doc_acc({DocAcc, [{Seq, Count} | RestSeq]}, Seq, Doc) ->
+    {[Doc | DocAcc], [{Seq, Count + 1} | RestSeq]};
+update_bulk_doc_acc({DocAcc, SeqAcc}, Seq, Doc) ->
+    {[Doc | DocAcc], [{Seq, 1} | SeqAcc]}.
+
+
+write_doc(Doc, Seq, Db, Cp) ->
+    case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
     {ok, _} ->
         Cp ! {add_stat, {#stats.docs_written, 1}};
-    Error ->
+    {error, <<"unauthorized">>} ->
         Cp ! {add_stat, {#stats.doc_write_failures, 1}},
-        case Error of
-        {error, <<"unauthorized">>} ->
-            ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
-                [?b2l(Doc#doc.id), couch_api_wrap:db_uri(Target)]);
-        _ ->
-            ok
-        end
-    end;
-doc_handler(_, _, _) ->
-    ok.
+        ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
+            [Doc#doc.id, couch_api_wrap:db_uri(Db)]);
+    _ ->
+        Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+    end,
+    seqs_done([{Seq, 1}], Cp).
+
+
+bulk_write_docs(Docs, Seqs, Db, Cp) ->
+    case couch_api_wrap:update_docs(
+        Db, Docs, [delay_commit], replicated_changes) of
+    {ok, []} ->
+        Cp ! {add_stat, {#stats.docs_written, length(Docs)}};
+    {ok, Errors} ->
+        Cp ! {add_stat, {#stats.doc_write_failures, length(Errors)}},
+        Cp ! {add_stat, {#stats.docs_written, length(Docs) - length(Errors)}},
+        DbUri = couch_api_wrap:db_uri(Db),
+        lists:foreach(
+            fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
+                    ?LOG_ERROR("Replicator: unauthorized to write document"
+                        " ~s to ~s", [Id, DbUri]);
+                (_) ->
+                    ok
+            end, Errors)
+    end,
+    seqs_done(Seqs, Cp).
+
+
+seqs_done(SeqCounts, Cp) ->
+    lists:foreach(fun({nil, _}) ->
+            ok;
+        (SeqCount) ->
+            Cp ! {seq_changes_done, SeqCount}
+        end, SeqCounts).
 
 
 checkpoint_interval(_State) ->