You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/03/04 03:53:48 UTC
svn commit: r749886 - in /couchdb/branches/rep_security:
share/www/script/couch_tests.js src/couchdb/couch_db.erl
src/couchdb/couch_rep.erl
Author: damien
Date: Wed Mar 4 02:53:48 2009
New Revision: 749886
URL: http://svn.apache.org/viewvc?rev=749886&view=rev
Log:
changes to how replication stats are tracked.
Modified:
couchdb/branches/rep_security/share/www/script/couch_tests.js
couchdb/branches/rep_security/src/couchdb/couch_db.erl
couchdb/branches/rep_security/src/couchdb/couch_rep.erl
Modified: couchdb/branches/rep_security/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/share/www/script/couch_tests.js?rev=749886&r1=749885&r2=749886&view=diff
==============================================================================
--- couchdb/branches/rep_security/share/www/script/couch_tests.js [utf-8] (original)
+++ couchdb/branches/rep_security/share/www/script/couch_tests.js [utf-8] Wed Mar 4 02:53:48 2009
@@ -3111,6 +3111,9 @@
// Now delete document
T(user2Db.deleteDoc(doc).ok);
+
+
+ // Now test replication
var AuthHeaders = {"WWW-Authenticate": "X-Couch-Test-Auth Christopher Lenz:dog food"};
var host = CouchDB.host;
var dbPairs = [
Modified: couchdb/branches/rep_security/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/src/couchdb/couch_db.erl?rev=749886&r1=749885&r2=749886&view=diff
==============================================================================
--- couchdb/branches/rep_security/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/rep_security/src/couchdb/couch_db.erl Wed Mar 4 02:53:48 2009
@@ -483,7 +483,7 @@
close(Db2),
case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
{ok, Conflicts} -> {ok, Conflicts};
- Else -> throw(Else)
+ retry -> throw({update_error, compaction_retry})
end
end.
Modified: couchdb/branches/rep_security/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/src/couchdb/couch_rep.erl?rev=749886&r1=749885&r2=749886&view=diff
==============================================================================
--- couchdb/branches/rep_security/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/rep_security/src/couchdb/couch_rep.erl Wed Mar 4 02:53:48 2009
@@ -19,8 +19,17 @@
headers
}).
+-record(rep_stats, {
+ docs_checked=0,
+ docs_missing=0,
+ docs_read=0,
+ docs_written=0,
+ doc_write_failures=0
+}).
+
-export([replicate/2, replicate/3]).
+
url_encode(Bin) when is_binary(Bin) ->
url_encode(binary_to_list(Bin));
url_encode([H|T]) ->
@@ -110,7 +119,8 @@
false ->
?LOG_INFO("Replication records differ. "
"Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", [OldRepHistoryProps, OldRepHistoryPropsTrg]),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [OldRepHistoryProps, OldRepHistoryPropsTrg]),
0
end,
@@ -138,13 +148,16 @@
"replication is redone and documents reexamined.", []),
SeqNum
end,
-
+ [rep_stats | StatsList] = tuple_to_list(Stats),
+ StatFieldNames =
+ [?l2b(tuple_to_list(T)) || T <- record_info(fields, rep_stats)],
+ StatProps = lists:zip(StatFieldNames, StatsList),
HistEntries =[
{
[{<<"start_time">>, list_to_binary(ReplicationStartTime)},
{<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
{<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | Stats]}
+ {<<"end_last_seq">>, NewSeqNum} | StatProps]}
| proplists:get_value("history", OldRepHistoryProps, [])],
% something changed, record results
NewRepHistory =
@@ -160,7 +173,7 @@
pull_rep(DbTarget, DbSource, SourceSeqNum) ->
{ok, {NewSeq, Stats}} =
- enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
+ enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, #rep_stats{}}),
{NewSeq, Stats}.
do_http_request(Url, Action, Headers) ->
@@ -209,34 +222,49 @@
do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
end.
-save_docs_buffer(DbTarget, DocsBuffer, []) ->
+save_docs_buffer(DbTarget, DocsBuffer, [], Stats) ->
receive
{Src, shutdown} ->
- {ok, _UpdateErrors} = update_docs(DbTarget, lists:reverse(DocsBuffer), [], replicated_changes),
- Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+ Stats2 = save_docs_with_stats(DbTarget, DocsBuffer, Stats),
+ Src ! {done, self(), Stats2}
end;
-save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) ->
+save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences, Stats) ->
[NextSeq|Rest] = UpdateSequences,
receive
{Src, skip, NextSeq} ->
Src ! got_it,
- save_docs_buffer(DbTarget, DocsBuffer, Rest);
+ save_docs_buffer(DbTarget, DocsBuffer, Rest, Stats);
{Src, docs, {NextSeq, Docs}} ->
Src ! got_it,
case couch_util:should_flush() of
true ->
- {ok, _UpdateErrors} = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [],
- replicated_changes),
- save_docs_buffer(DbTarget, [], Rest);
+ Stats2 =
+ save_docs_with_stats(DbTarget, Docs++DocsBuffer, Stats),
+ save_docs_buffer(DbTarget, [], Rest, Stats2);
false ->
- save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest)
+ save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest, Stats)
end;
- {Src, shutdown} ->
+ {Src, shutdown} ->
?LOG_ERROR("received shutdown while waiting for more update_seqs", []),
- {ok, _Errors} = update_docs(DbTarget, lists:reverse(DocsBuffer), [], replicated_changes),
- Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+ Stats2 = save_docs_with_stats(DbTarget,DocsBuffer,Stats),
+ Src ! {done, self(), Stats2}
end.
+save_docs_with_stats(Db, Docs, Stats) ->
+ {ok, Errors} = update_docs(Db, Docs, [], replicated_changes),
+ dump_update_errors(Errors),
+ Stats#rep_stats{
+ docs_written=Stats#rep_stats.docs_written+length(Docs)-length(Errors),
+ doc_write_failures=Stats#rep_stats.doc_write_failures+length(Errors)}.
+
+
+dump_update_errors([]) -> ok;
+dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
+ ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
+ [Id, couch_doc:rev_to_str(Rev), Error]),
+ dump_update_errors(Rest).
+
+
pmap(F,List) ->
[wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
@@ -251,14 +279,15 @@
enum_docs_parallel(DbS, DbT, InfoList) ->
UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList],
- SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
+ SaveDocsPid = spawn_link(fun() ->
+ save_docs_buffer(DbT,[],UpdateSeqs, #rep_stats{}) end),
- Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
+ ReadStatsList = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
case MissingRevs of
[] ->
SaveDocsPid ! {self(), skip, Seq},
receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)}];
+ #rep_stats{docs_checked=length(SrcRevs)};
_ ->
{ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
@@ -268,29 +297,33 @@
% include update_seq so we save docs in order
SaveDocsPid ! {self(), docs, {Seq, Docs}},
receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)},
- {missing_found, length(MissingRevs)},
- {docs_read, length(Docs)}]
+ #rep_stats{docs_checked=length(SrcRevs),
+ docs_missing=length(MissingRevs),
+ docs_read=length(Docs)}
end
end, InfoList),
SaveDocsPid ! {self(), shutdown},
- {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) ->
- C1 = C + proplists:get_value(missing_checked, S, 0),
- F1 = F + proplists:get_value(missing_found, S, 0),
- R1 = R + proplists:get_value(docs_read, S, 0),
- {C1, F1, R1}
- end, {0, 0, 0}, Stats),
-
receive
- {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok
+ {done, SaveDocsPid, WriteStats} -> ok
end,
- [ {<<"missing_checked">>, MissingChecked},
- {<<"missing_found">>, MissingFound},
- {<<"docs_read">>, DocsRead},
- {<<"docs_written">>, DocsWritten} ].
+ lists:foldl(
+ fun(StatIn, AccStat) ->
+ sum_rep_stats(StatIn, AccStat)
+ end, #rep_stats{}, [WriteStats | ReadStatsList]).
+
+
+sum_rep_stats(StatsA, StatsB) ->
+ % Quick and dirty sum matchng members of the records
+ % convert to lists
+ [rep_stats | MembersA] = tuple_to_list(StatsA),
+ [rep_stats | MembersB] = tuple_to_list(StatsB),
+ % pairwise add the members and convert back to the record
+ list_to_tuple([rep_stats |
+ lists:zipwith(fun(A,B) -> A + B end, MembersA, MembersB)]).
+
fix_url(UrlBin) ->
Url = binary_to_list(UrlBin),
@@ -358,11 +391,11 @@
end, {0, []}),
lists:reverse(DocInfoList).
-enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
+enum_docs_since(DbSource, DbTarget, StartSeq, {AccLastSeq, AccStats}) ->
DocInfoList = get_doc_info_list(DbSource, StartSeq),
case DocInfoList of
[] ->
- {ok, InAcc};
+ {ok, {AccLastSeq, AccStats}};
_ ->
UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
SrcRevsList = lists:map(fun(SrcDocInfo) ->
@@ -380,21 +413,7 @@
{Id, Seq, SrcRevs, MissingRevs}
end, lists:zip(SrcRevsList, UpdateSeqs)),
Stats = enum_docs_parallel(DbSource, DbTarget, InfoList),
- OldStats = element(2, InAcc),
- TotalStats = [
- {<<"missing_checked">>,
- proplists:get_value(<<"missing_checked">>, OldStats, 0) +
- proplists:get_value(<<"missing_checked">>, Stats, 0)},
- {<<"missing_found">>,
- proplists:get_value(<<"missing_found">>, OldStats, 0) +
- proplists:get_value(<<"missing_found">>, Stats, 0)},
- {<<"docs_read">>,
- proplists:get_value(<<"docs_read">>, OldStats, 0) +
- proplists:get_value(<<"docs_read">>, Stats, 0)},
- {<<"docs_written">>,
- proplists:get_value(<<"docs_written">>, OldStats, 0) +
- proplists:get_value(<<"docs_written">>, Stats, 0)}
- ],
+ TotalStats = sum_rep_stats(Stats, AccStats),
#doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})