You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/02 02:53:15 UTC
svn commit: r1003724 - in /couchdb/branches/new_replicator/src/couchdb:
couch_replicator_doc_copiers.erl couch_replicator_rev_finders.erl
Author: fdmanana
Date: Sat Oct 2 00:53:15 2010
New Revision: 1003724
URL: http://svn.apache.org/viewvc?rev=1003724&view=rev
Log:
New replicator: accumulate stats where possible to avoid flooding replication gen_servers with tons of messages.
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003724&r1=1003723&r2=1003724&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Sat Oct 2 00:53:15 2010
@@ -32,99 +32,121 @@ spawn_doc_copiers(Cp, Source, Target, Mi
lists:seq(1, CopiersCount)).
+-record(doc_acc, {
+ docs = [],
+ seqs = [],
+ read = 0,
+ written = 0,
+ wfail = 0,
+ cp
+}).
+
doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue) ->
case couch_work_queue:dequeue(MissingRevsQueue, ?DOC_BATCH_SIZE) of
closed ->
?LOG_DEBUG("Doc copier ~p got missing revs queue closed", [CopierId]),
Cp ! {done, CopierId};
+
{ok, [{doc_id, _} | _] = DocIds} ->
- {BulkList, []} = lists:foldl(
+ DocAcc = lists:foldl(
fun({doc_id, Id}, Acc) ->
?LOG_DEBUG("Doc copier ~p got {doc_id, ~p}", [CopierId, Id]),
{ok, Acc2} = couch_api_wrap:open_doc_revs(
Source, Id, all, [],
- fun(R, A) -> doc_handler(R, nil, Target, Cp, A) end, Acc),
+ fun(R, A) -> doc_handler(R, nil, Target, A) end, Acc),
Acc2
end,
- {[], []}, DocIds),
- bulk_write_docs(lists:reverse(BulkList), [], Target, Cp),
+ #doc_acc{cp = Cp}, DocIds),
+ maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp),
+ #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target),
+ maybe_send_stat(W, #rep_stats.docs_written, Cp),
+ maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp),
doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue);
+
{ok, IdRevList} ->
- {Source2, {BulkList, SeqList}} = lists:foldl(
+ {Source2, DocAcc} = lists:foldl(
fun({Id, Revs, PossibleAncestors, Seq} = IdRev, {SrcDb, BulkAcc}) ->
?LOG_DEBUG("Doc copier ~p got ~p", [CopierId, IdRev]),
SrcDb2 = couch_api_wrap:maybe_reopen_db(SrcDb, Seq),
{ok, BulkAcc2} = couch_api_wrap:open_doc_revs(
SrcDb2, Id, Revs, [{atts_since, PossibleAncestors}],
- fun(R, A) -> doc_handler(R, Seq, Target, Cp, A) end,
+ fun(R, A) -> doc_handler(R, Seq, Target, A) end,
BulkAcc),
{SrcDb2, BulkAcc2}
end,
- {Source, {[], []}}, IdRevList),
- bulk_write_docs(
- lists:reverse(BulkList),
- lists:reverse(SeqList),
- Target,
- Cp),
+ {Source, #doc_acc{cp = Cp}}, IdRevList),
+ maybe_send_stat(DocAcc#doc_acc.read, #rep_stats.docs_read, Cp),
+ #doc_acc{written = W, wfail = Wf} = bulk_write_docs(DocAcc, Target),
+ maybe_send_stat(W, #rep_stats.docs_written, Cp),
+ maybe_send_stat(Wf, #rep_stats.doc_write_failures, Cp),
doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
end.
-doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Cp, Acc) ->
- Cp ! {add_stat, {#rep_stats.docs_read, 1}},
+doc_handler({ok, #doc{atts = []} = Doc}, Seq, _Target, Acc) ->
update_bulk_doc_acc(Acc, Seq, Doc);
-doc_handler({ok, Doc}, Seq, Target, Cp, Acc) ->
- Cp ! {add_stat, {#rep_stats.docs_read, 1}},
- write_doc(Doc, Seq, Target, Cp),
- Acc;
+doc_handler({ok, Doc}, Seq, Target, Acc) ->
+ write_doc(Doc, Seq, Target, Acc);
-doc_handler(_, _, _, _, Acc) ->
+doc_handler(_, _, _, Acc) ->
Acc.
-update_bulk_doc_acc({DocAcc, SeqAcc}, nil, Doc) ->
- {[Doc | DocAcc], SeqAcc};
-update_bulk_doc_acc({DocAcc, [{Seq, Count} | RestSeq]}, Seq, Doc) ->
- {[Doc | DocAcc], [{Seq, Count + 1} | RestSeq]};
-update_bulk_doc_acc({DocAcc, SeqAcc}, Seq, Doc) ->
- {[Doc | DocAcc], [{Seq, 1} | SeqAcc]}.
+update_bulk_doc_acc(#doc_acc{docs = Docs, read = Read} = Acc, nil, Doc) ->
+ Acc#doc_acc{
+ docs = [Doc | Docs],
+ read = Read + 1
+ };
+
+update_bulk_doc_acc(#doc_acc{seqs = [{Seq, Count} | Rest]} = Acc, Seq, Doc) ->
+ Acc#doc_acc{
+ docs = [Doc | Acc#doc_acc.docs],
+ seqs = [{Seq, Count + 1} | Rest],
+ read = Acc#doc_acc.read + 1
+ };
+
+update_bulk_doc_acc(#doc_acc{seqs = Seqs, read = Read} = Acc, Seq, Doc) ->
+ Acc#doc_acc{
+ docs = [Doc | Acc#doc_acc.docs],
+ seqs = [{Seq, 1} | Seqs],
+ read = Read + 1
+ }.
-write_doc(Doc, Seq, Db, Cp) ->
- case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
+write_doc(Doc, Seq, Db, #doc_acc{written = W, wfail = F, read = R} = Acc) ->
+ Acc2 = case couch_api_wrap:update_doc(Db, Doc, [], replicated_changes) of
{ok, _} ->
- Cp ! {add_stat, {#rep_stats.docs_written, 1}};
+ Acc#doc_acc{written = W + 1, read = R + 1};
{error, <<"unauthorized">>} ->
- Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}},
?LOG_ERROR("Replicator: unauthorized to write document ~s to ~s",
- [Doc#doc.id, couch_api_wrap:db_uri(Db)]);
+ [Doc#doc.id, couch_api_wrap:db_uri(Db)]),
+ Acc#doc_acc{wfail = F + 1, read = R + 1};
_ ->
- Cp ! {add_stat, {#rep_stats.doc_write_failures, 1}}
+ Acc#doc_acc{wfail = F + 1, read = R + 1}
end,
- seqs_done([{Seq, 1}], Cp).
+ seqs_done([{Seq, 1}], Acc#doc_acc.cp),
+ Acc2.
-bulk_write_docs([], _, _, _) ->
- ok;
-bulk_write_docs(Docs, Seqs, Db, Cp) ->
- case couch_api_wrap:update_docs(
- Db, Docs, [delay_commit], replicated_changes) of
- {ok, []} ->
- Cp ! {add_stat, {#rep_stats.docs_written, length(Docs)}};
- {ok, Errors} ->
- Cp ! {add_stat, {#rep_stats.doc_write_failures, length(Errors)}},
- Cp ! {add_stat, {#rep_stats.docs_written, length(Docs) - length(Errors)}},
- DbUri = couch_api_wrap:db_uri(Db),
- lists:foreach(
- fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
- ?LOG_ERROR("Replicator: unauthorized to write document"
- " ~s to ~s", [Id, DbUri]);
- (_) ->
- ok
- end, Errors)
- end,
- seqs_done(Seqs, Cp).
+bulk_write_docs(#doc_acc{docs = []} = Acc, _) ->
+ Acc;
+bulk_write_docs(#doc_acc{docs = Docs, seqs = Seqs, cp = Cp} = Acc, Db) ->
+ {ok, Errors} = couch_api_wrap:update_docs(
+ Db, Docs, [delay_commit], replicated_changes),
+ DbUri = couch_api_wrap:db_uri(Db),
+ lists:foreach(
+ fun({[ {<<"id">>, Id}, {<<"error">>, <<"unauthorized">>} ]}) ->
+ ?LOG_ERROR("Replicator: unauthorized to write document"
+ " ~s to ~s", [Id, DbUri]);
+ (_) ->
+ ok
+ end, Errors),
+ seqs_done(Seqs, Cp),
+ Acc#doc_acc{
+ wfail = Acc#doc_acc.wfail + length(Errors),
+ written = Acc#doc_acc.written + length(Docs) - length(Errors)
+ }.
seqs_done([], _) ->
@@ -133,3 +155,10 @@ seqs_done([{nil, _} | _], _) ->
ok;
seqs_done(SeqCounts, Cp) ->
Cp ! {seq_changes_done, SeqCounts}.
+
+
+maybe_send_stat(0, _StatPos, _Cp) ->
+ ok;
+maybe_send_stat(Value, StatPos, Cp) ->
+ Cp ! {add_stat, {StatPos, Value}}.
+
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003724&r1=1003723&r2=1003724&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Sat Oct 2 00:53:15 2010
@@ -74,12 +74,14 @@ missing_revs_finder_loop(FinderId, Cp, T
{_Id, {Revs, Seq}} <- dict:to_list(NonMissingIdRevsSeqDict)]},
% Expand out each docs and seq into it's own work item
- lists:foreach(fun({Id, Revs, PAs}) ->
- % PA means "possible ancestor"
- Cp ! {add_stat, {#rep_stats.missing_found, length(Revs)}},
- {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
- ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq})
- end, Missing),
+ MissingCount = lists:foldl(
+ fun({Id, Revs, PAs}, Count) ->
+ % PA means "possible ancestor"
+ {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+ ok = couch_work_queue:queue(RevsQueue, {Id, Revs, PAs, Seq}),
+ Count + length(Revs)
+ end, 0, Missing),
+ Cp ! {add_stat, {#rep_stats.missing_found, MissingCount}},
missing_revs_finder_loop(FinderId, Cp, Target, ChangesQueue, RevsQueue)
end.