You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/03/04 03:53:48 UTC

svn commit: r749886 - in /couchdb/branches/rep_security: share/www/script/couch_tests.js src/couchdb/couch_db.erl src/couchdb/couch_rep.erl

Author: damien
Date: Wed Mar  4 02:53:48 2009
New Revision: 749886

URL: http://svn.apache.org/viewvc?rev=749886&view=rev
Log:
changes to how replication stats are tracked.

Modified:
    couchdb/branches/rep_security/share/www/script/couch_tests.js
    couchdb/branches/rep_security/src/couchdb/couch_db.erl
    couchdb/branches/rep_security/src/couchdb/couch_rep.erl

Modified: couchdb/branches/rep_security/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/share/www/script/couch_tests.js?rev=749886&r1=749885&r2=749886&view=diff
==============================================================================
--- couchdb/branches/rep_security/share/www/script/couch_tests.js [utf-8] (original)
+++ couchdb/branches/rep_security/share/www/script/couch_tests.js [utf-8] Wed Mar  4 02:53:48 2009
@@ -3111,6 +3111,9 @@
     
     // Now delete document
     T(user2Db.deleteDoc(doc).ok);
+    
+    
+    // Now test replication
     var AuthHeaders = {"WWW-Authenticate": "X-Couch-Test-Auth Christopher Lenz:dog food"};
     var host = CouchDB.host;
     var dbPairs = [

Modified: couchdb/branches/rep_security/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/src/couchdb/couch_db.erl?rev=749886&r1=749885&r2=749886&view=diff
==============================================================================
--- couchdb/branches/rep_security/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/rep_security/src/couchdb/couch_db.erl Wed Mar  4 02:53:48 2009
@@ -483,7 +483,7 @@
         close(Db2),
         case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
         {ok, Conflicts} -> {ok, Conflicts};
-        Else -> throw(Else)
+        retry -> throw({update_error, compaction_retry})
         end
     end.
 

Modified: couchdb/branches/rep_security/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/rep_security/src/couchdb/couch_rep.erl?rev=749886&r1=749885&r2=749886&view=diff
==============================================================================
--- couchdb/branches/rep_security/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/rep_security/src/couchdb/couch_rep.erl Wed Mar  4 02:53:48 2009
@@ -19,8 +19,17 @@
     headers
 }).
 
+-record(rep_stats, {
+    docs_checked=0,
+    docs_missing=0,
+    docs_read=0,
+    docs_written=0,
+    doc_write_failures=0
+}).
+    
 -export([replicate/2, replicate/3]).
 
+
 url_encode(Bin) when is_binary(Bin) ->
     url_encode(binary_to_list(Bin));
 url_encode([H|T]) ->
@@ -110,7 +119,8 @@
     false ->
         ?LOG_INFO("Replication records differ. "
                 "Performing full replication instead of incremental.", []),
-        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", [OldRepHistoryProps, OldRepHistoryPropsTrg]),
+        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+                [OldRepHistoryProps, OldRepHistoryPropsTrg]),
         0
     end,
 
@@ -138,13 +148,16 @@
                 "replication is redone and documents reexamined.", []),
             SeqNum
         end,
-        
+        [rep_stats | StatsList] = tuple_to_list(Stats),
+        StatFieldNames =
+                [?l2b(tuple_to_list(T)) || T <- record_info(fields, rep_stats)],
+        StatProps = lists:zip(StatFieldNames, StatsList),
         HistEntries =[
             {
                 [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
                 {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
                 {<<"start_last_seq">>, SeqNum},
-                {<<"end_last_seq">>, NewSeqNum} | Stats]}
+                {<<"end_last_seq">>, NewSeqNum} | StatProps]}
             | proplists:get_value("history", OldRepHistoryProps, [])],
         % something changed, record results
         NewRepHistory =
@@ -160,7 +173,7 @@
 
 pull_rep(DbTarget, DbSource, SourceSeqNum) ->
     {ok, {NewSeq, Stats}} = 
-        enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
+        enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, #rep_stats{}}),
     {NewSeq, Stats}.
 
 do_http_request(Url, Action, Headers) ->
@@ -209,34 +222,49 @@
         do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
     end.
 
-save_docs_buffer(DbTarget, DocsBuffer, []) ->
+save_docs_buffer(DbTarget, DocsBuffer, [], Stats) ->
     receive
     {Src, shutdown} ->
-        {ok, _UpdateErrors} = update_docs(DbTarget, lists:reverse(DocsBuffer), [], replicated_changes),
-        Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+        Stats2 = save_docs_with_stats(DbTarget, DocsBuffer, Stats),
+        Src ! {done, self(), Stats2}
     end;
-save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) ->
+save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences, Stats) ->
     [NextSeq|Rest] = UpdateSequences,
     receive
     {Src, skip, NextSeq} ->
         Src ! got_it,
-        save_docs_buffer(DbTarget, DocsBuffer, Rest);
+        save_docs_buffer(DbTarget, DocsBuffer, Rest, Stats);
     {Src, docs, {NextSeq, Docs}} ->
         Src ! got_it,
         case couch_util:should_flush() of
             true ->
-                {ok, _UpdateErrors} = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [], 
-                    replicated_changes),
-                save_docs_buffer(DbTarget, [], Rest);
+                Stats2 =
+                    save_docs_with_stats(DbTarget, Docs++DocsBuffer, Stats),
+                save_docs_buffer(DbTarget, [], Rest, Stats2);
             false ->
-                save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest)
+                save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest, Stats)
         end;
-        {Src, shutdown} ->
+    {Src, shutdown} ->
         ?LOG_ERROR("received shutdown while waiting for more update_seqs", []),
-        {ok, _Errors} = update_docs(DbTarget, lists:reverse(DocsBuffer), [], replicated_changes),
-        Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+        Stats2 = save_docs_with_stats(DbTarget,DocsBuffer,Stats),
+        Src ! {done, self(), Stats2}
     end.
 
+save_docs_with_stats(Db, Docs, Stats) ->
+   {ok, Errors} = update_docs(Db, Docs, [], replicated_changes),
+    dump_update_errors(Errors),
+    Stats#rep_stats{
+        docs_written=Stats#rep_stats.docs_written+length(Docs)-length(Errors),
+        doc_write_failures=Stats#rep_stats.doc_write_failures+length(Errors)}.
+
+
+dump_update_errors([]) -> ok;
+dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
+    ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
+        [Id, couch_doc:rev_to_str(Rev), Error]),
+    dump_update_errors(Rest).
+
+
 pmap(F,List) ->
     [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
 
@@ -251,14 +279,15 @@
 
 enum_docs_parallel(DbS, DbT, InfoList) ->
     UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList],
-    SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
+    SaveDocsPid = spawn_link(fun() ->
+            save_docs_buffer(DbT,[],UpdateSeqs, #rep_stats{}) end),
     
-    Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
+    ReadStatsList = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
         case MissingRevs of
         [] ->
             SaveDocsPid ! {self(), skip, Seq},
             receive got_it -> ok end,
-            [{missing_checked, length(SrcRevs)}];
+            #rep_stats{docs_checked=length(SrcRevs)};
         _ ->
             {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
             
@@ -268,29 +297,33 @@
             % include update_seq so we save docs in order
             SaveDocsPid ! {self(), docs, {Seq, Docs}},
             receive got_it -> ok end,
-            [{missing_checked, length(SrcRevs)},
-             {missing_found, length(MissingRevs)},
-             {docs_read, length(Docs)}]
+            #rep_stats{docs_checked=length(SrcRevs),
+                docs_missing=length(MissingRevs),
+                docs_read=length(Docs)}
         end
     end, InfoList),
     
     SaveDocsPid ! {self(), shutdown},
     
-    {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) ->
-        C1 = C + proplists:get_value(missing_checked, S, 0),
-        F1 = F + proplists:get_value(missing_found, S, 0),
-        R1 = R + proplists:get_value(docs_read, S, 0),
-        {C1, F1, R1}
-    end, {0, 0, 0}, Stats),
-    
     receive
-        {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok
+        {done, SaveDocsPid, WriteStats} -> ok
     end,
     
-    [ {<<"missing_checked">>, MissingChecked},
-      {<<"missing_found">>, MissingFound}, 
-      {<<"docs_read">>, DocsRead},
-      {<<"docs_written">>, DocsWritten} ].
+    lists:foldl(
+        fun(StatIn, AccStat) ->
+            sum_rep_stats(StatIn, AccStat)
+        end, #rep_stats{}, [WriteStats | ReadStatsList]).
+
+
+sum_rep_stats(StatsA, StatsB) ->
+    % Quick and dirty sum matchng members of the records
+    % convert to lists
+    [rep_stats | MembersA] = tuple_to_list(StatsA),
+    [rep_stats | MembersB] = tuple_to_list(StatsB),
+    % pairwise add the members and convert back to the record
+    list_to_tuple([rep_stats |
+            lists:zipwith(fun(A,B) -> A + B end, MembersA, MembersB)]).
+
 
 fix_url(UrlBin) ->
     Url = binary_to_list(UrlBin),
@@ -358,11 +391,11 @@
     end, {0, []}),
     lists:reverse(DocInfoList).
 
-enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
+enum_docs_since(DbSource, DbTarget, StartSeq, {AccLastSeq, AccStats}) ->
     DocInfoList = get_doc_info_list(DbSource, StartSeq),
     case DocInfoList of
     [] ->
-        {ok, InAcc};
+        {ok, {AccLastSeq, AccStats}};
     _ ->
         UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
         SrcRevsList = lists:map(fun(SrcDocInfo) ->
@@ -380,21 +413,7 @@
             {Id, Seq, SrcRevs, MissingRevs}
         end, lists:zip(SrcRevsList, UpdateSeqs)),
         Stats = enum_docs_parallel(DbSource, DbTarget, InfoList),
-        OldStats = element(2, InAcc),
-        TotalStats = [
-            {<<"missing_checked">>, 
-                proplists:get_value(<<"missing_checked">>, OldStats, 0) +
-                proplists:get_value(<<"missing_checked">>, Stats, 0)},
-            {<<"missing_found">>, 
-                proplists:get_value(<<"missing_found">>, OldStats, 0) +
-                proplists:get_value(<<"missing_found">>, Stats, 0)},
-            {<<"docs_read">>, 
-                proplists:get_value(<<"docs_read">>, OldStats, 0) +
-                proplists:get_value(<<"docs_read">>, Stats, 0)},
-            {<<"docs_written">>, 
-                proplists:get_value(<<"docs_written">>, OldStats, 0) +
-                proplists:get_value(<<"docs_written">>, Stats, 0)}
-        ],
+        TotalStats = sum_rep_stats(Stats, AccStats),
         
         #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
         enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})