You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/09/22 01:14:41 UTC
svn commit: r999685 - in /couchdb/branches/new_replicator/src/couchdb:
couch_api_wrap.erl couch_replicate.erl
Author: fdmanana
Date: Tue Sep 21 23:14:41 2010
New Revision: 999685
URL: http://svn.apache.org/viewvc?rev=999685&view=rev
Log:
New replicator: update documents in batches for better performance (still needs some more work).
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=999685&r1=999684&r2=999685&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Sep 21 23:14:41 2010
@@ -38,11 +38,13 @@
db_close/1,
get_db_info/1,
update_doc/3,
+ update_doc/4,
+ update_docs/3,
+ update_docs/4,
ensure_full_commit/1,
get_missing_revs/2,
open_doc/3,
open_doc_revs/6,
- update_doc/4,
changes_since/5,
db_uri/1
]).
@@ -122,10 +124,6 @@ get_db_info(Db) ->
{ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
-update_doc(Db, Doc, Options) ->
- update_doc(Db,Doc,Options,interactive_edit).
-
-
ensure_full_commit(#httpdb{} = Db) ->
send_req(
Db,
@@ -216,6 +214,8 @@ maybe_reopen_db(#db{update_seq = UpSeq,
Db
end.
+update_doc(Db, Doc, Options) ->
+ update_doc(Db, Doc, Options, interactive_edit).
update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
QArgs = case Type of
@@ -274,6 +274,35 @@ update_doc(Db, Doc, Options, Type) ->
{error, <<"unauthorized">>}
end.
+
+update_docs(Db, DocList, Options) ->
+ update_docs(Db, DocList, Options, interactive_edit).
+
+update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
+ DocList1 = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- DocList],
+ Body = case UpdateType of
+ replicated_changes ->
+ {[{new_edits, false}, {docs, DocList1}]};
+ interactive_edit ->
+ {[{docs, DocList1}]}
+ end,
+ FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+ send_req(
+ HttpDb,
+ [{method, post}, {path, "_bulk_docs"}, {body, ?JSON_ENCODE(Body)},
+ {headers, [
+ {"X-Couch-Full-Commit", FullCommit},
+ {"Content-Type", "application/json"} ]}],
+ fun(201, _, Results) when is_list(Results) ->
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (417, _, Results) when is_list(Results) ->
+ {ok, bulk_results_to_errors(DocList, Results, remote)}
+ end);
+update_docs(Db, DocList, Options, UpdateType) ->
+ Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
+ {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+
+
changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Options) ->
QArgs = changes_q_args(
[{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}],
@@ -510,3 +539,38 @@ encode_doc_id(<<"_local/", RestId/binary
"_local/" ++ url_encode(RestId);
encode_doc_id(DocId) ->
url_encode(DocId).
+
+
+bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
+ lists:reverse(lists:foldl(
+ fun({_, {ok, _}}, Acc) ->
+ Acc;
+ ({#doc{id = Id}, Error}, Acc) ->
+ {_, Error, _Reason} = couch_httpd:error_info(Error),
+ [ {[{<<"id">>, Id}, {<<"error">>, Error}]} | Acc ]
+ end,
+ [], lists:zip(Docs, Results)));
+
+bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
+ bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
+
+bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
+ lists:map(
+ fun({{Id, _Rev}, Err}) ->
+ {_, Error, _Reason} = couch_httpd:error_info(Err),
+ {[{<<"id">>, Id}, {<<"error">>, Error}]}
+ end,
+ Results);
+
+bulk_results_to_errors(_Docs, Results, remote) ->
+ lists:reverse(lists:foldl(
+ fun({Props}, Acc) ->
+ case get_value(<<"error">>, Props, get_value(error, Props)) of
+ undefined ->
+ Acc;
+ Error ->
+ Id = get_value(<<"id">>, Props, get_value(id, Props)),
+ [ {[{<<"id">>, Id}, {<<"error">>, Error}]} | Acc ]
+ end
+ end,
+ [], Results)).
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=999685&r1=999684&r2=999685&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Tue Sep 21 23:14:41 2010
@@ -31,6 +31,7 @@
% Can't be greater than the maximum number of child restarts specified
% in couch_rep_sup.erl.
-define(MAX_RESTARTS, 3).
+-define(DOC_BATCH_SIZE, 50).
-record(stats, {
@@ -600,7 +601,7 @@ remove_missing(IdRevsSeqDict, [{MissingI
spawn_doc_copiers(Cp, Source, Target, MissingRevsQueue) ->
- Count = ?l2i(couch_config:get("replicator", "copy_processes", "8")),
+ Count = ?l2i(couch_config:get("replicator", "copy_processes", "10")),
lists:map(
fun(CopierId) ->
Pid = spawn_link(fun() ->
@@ -612,44 +613,104 @@ spawn_doc_copiers(Cp, Source, Target, Mi
doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
- case couch_work_queue:dequeue(MissingRevsQueue,1) of
+ case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
closed ->
?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
Cp ! {done, CopierId};
- {ok, [{doc_id, Id}]} ->
- ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
- couch_api_wrap:open_doc_revs(
- Source, Id, all, [],
- fun(R, _) -> doc_handler(R, Target, Cp) end, []),
+ {ok, [{doc_id, _} | _] = DocIds} ->
+ {BulkList, []} = lists:foldl(
+ fun({doc_id, Id}, Acc) ->
+ ?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
+ {ok, Acc2} = couch_api_wrap:open_doc_revs(
+ Source, Id, all, [],
+ fun(R, A) -> doc_handler(R, nil, Target, Cp, A) end, Acc),
+ Acc2
+ end,
+ {[], []}, DocIds),
+ bulk_write_docs(lists:reverse(BulkList), [], Target, Cp),
doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
- {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
- ?LOG_DEBUG("Doc copier ~p got {~p, ~p, ~p, ~p}",
- [CopierId, Id, Revs, PossibleAncestors, Seq]),
- Source2 = couch_api_wrap:maybe_reopen_db(Source, Seq),
- couch_api_wrap:open_doc_revs(
- Source2, Id, Revs, [{atts_since, PossibleAncestors}],
- fun(R, _) -> doc_handler(R, Target, Cp) end, []),
- Cp ! {seq_changes_done, {Seq, length(Revs)}},
+ {ok, IdRevList} ->
+ {Source2, {BulkList, SeqList}} = lists:foldl(
+ fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
+ ?LOG_DEBUG("Doc copier ~p got ~p", [CopierId, IdRev]),
+ SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
+ {ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
+ SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
+ fun(R, A) -> doc_handler(R, Seq, Target, Cp, A) end,
+ BulkAcc),
+ {SrcDb2, BulkAcc2}
+ end,
+ {Source, {[], []}}, IdRevList),
+ bulk_write_docs(
+ lists:reverse(BulkList),
+ lists:reverse(SeqList),
+ Target,
+ Cp),
doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
end.
-doc_handler({ok, Doc}, Target, Cp) ->
+
+doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Cp, Acc) ->
+ Cp ! {add_stat, {#stats.docs_read, 1}},
+ update_bulk_doc_acc(Acc, Seq, Doc);
+
+doc_handler({ok, Doc}, Seq, Target, Cp, Acc) ->
Cp ! {add_stat, {#stats.docs_read, 1}},
- case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+ write_doc(Doc, Seq, Target, Cp),
+ Acc;
+
+doc_handler(_, _, _, _, Acc) ->
+ Acc.
+
+
+update_bulk_doc_acc({DocAcc, SeqAcc}, nil, Doc) ->
+ {[Doc | DocAcc], SeqAcc};
+update_bulk_doc_acc({DocAcc, [{Seq, Count} | RestSeq]}, Seq, Doc) ->
+ {[Doc | DocAcc], [{Seq, Count + 1} | RestSeq]};
+update_bulk_doc_acc({DocAcc, SeqAcc}, Seq, Doc) ->
+ {[Doc | DocAcc], [{Seq, 1} | SeqAcc]}.
+
+
+write_doc(Doc, Seq, Db, Cp) ->
+ case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
{ok, _} ->
Cp ! {add_stat, {#stats.docs_written, 1}};
- Error ->
+ {error, <<"unauthorized">>} ->
Cp ! {add_stat, {#stats.doc_write_failures, 1}},
- case Error of
- {error, <<"unauthorized">>} ->
- ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
- [?b2l(Doc#doc.id), couch_api_wrap:db_uri(Target)]);
- _ ->
- ok
- end
- end;
-doc_handler(_, _, _) ->
- ok.
+ ?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
+ [Doc#doc.id, couch_api_wrap:db_uri(Db)]);
+ _ ->
+ Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+ end,
+ seqs_done([{Seq, 1}], Cp).
+
+
+bulk_write_docs(Docs, Seqs, Db, Cp) ->
+ case couch_api_wrap:update_docs(
+ Db, Docs, [delay_commit], replicated_changes) of
+ {ok, []} ->
+ Cp ! {add_stat, {#stats.docs_written, length(Docs)}};
+ {ok, Errors} ->
+ Cp ! {add_stat, {#stats.doc_write_failures, length(Errors)}},
+ Cp ! {add_stat, {#stats.docs_written, length(Docs) - length(Errors)}},
+ DbUri = couch_api_wrap:db_uri(Db),
+ lists:foreach(
+ fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
+ ?LOG_ERROR("Replicator: unauthorized to write document"
+ " ~s to ~s", [Id, DbUri]);
+ (_) ->
+ ok
+ end, Errors)
+ end,
+ seqs_done(Seqs, Cp).
+
+
+seqs_done(SeqCounts, Cp) ->
+ lists:foreach(fun({nil, _}) ->
+ ok;
+ (SeqCount) ->
+ Cp ! {seq_changes_done, SeqCount}
+ end, SeqCounts).
checkpoint_interval(_State) ->