You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:43 UTC

[12/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Provide an abstract interface to replicator stats

Switch to an ordered_dict() under the hood.

BugzID: 24298


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/65a1a106
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/65a1a106
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/65a1a106

Branch: refs/heads/master
Commit: 65a1a106a4ec3e4e3adbe5b26566c5d0eea3c940
Parents: 6f2dc4c
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 15:53:23 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:02:53 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl                | 28 +++++-----
 src/couch_replicator.hrl                |  8 ---
 src/couch_replicator_changes_reader.erl |  2 +-
 src/couch_replicator_stats.erl          | 83 ++++++++++++++++++++++++++++
 src/couch_replicator_utils.erl          | 14 +----
 src/couch_replicator_worker.erl         | 53 +++++++++---------
 6 files changed, 127 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 5891084..47bbd3a 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -64,7 +64,7 @@
     changes_manager,
     changes_reader,
     workers,
-    stats = #rep_stats{},
+    stats = couch_replicator_stats:new(),
     session_id,
     source_db_compaction_notifier = nil,
     target_db_compaction_notifier = nil,
@@ -694,11 +694,11 @@ do_checkpoint(State) ->
             {<<"start_last_seq">>, StartSeq},
             {<<"end_last_seq">>, NewSeq},
             {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, Stats#rep_stats.missing_checked},
-            {<<"missing_found">>, Stats#rep_stats.missing_found},
-            {<<"docs_read">>, Stats#rep_stats.docs_read},
-            {<<"docs_written">>, Stats#rep_stats.docs_written},
-            {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+            {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
+            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
+            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+            {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
         ]},
         BaseHistory = [
             {<<"session_id">>, SessionId},
@@ -714,9 +714,9 @@ do_checkpoint(State) ->
             [
                 {<<"start_time">>, StartTime},
                 {<<"end_time">>, EndTime},
-                {<<"docs_read">>, Stats#rep_stats.docs_read},
-                {<<"docs_written">>, Stats#rep_stats.docs_written},
-                {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+                {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+                {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
             ]
         end,
         % limit history to 50 entries
@@ -921,12 +921,12 @@ rep_stats(State) ->
         source_seq = SourceCurSeq
     } = State,
     [
-        {revisions_checked, Stats#rep_stats.missing_checked},
-        {missing_revisions_found, Stats#rep_stats.missing_found},
-        {docs_read, Stats#rep_stats.docs_read},
-        {docs_written, Stats#rep_stats.docs_written},
+        {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
+        {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
+        {docs_read, couch_replicator_stats:docs_read(Stats)},
+        {docs_written, couch_replicator_stats:docs_written(Stats)},
         {changes_pending, pending(SourceCurSeq, CommittedSeq)},
-        {doc_write_failures, Stats#rep_stats.doc_write_failures},
+        {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
         {checkpointed_source_seq, CommittedSeq}
     ].
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 018aa4b..893d9e4 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -20,11 +20,3 @@
     user_ctx,
     doc_id
 }).
-
--record(rep_stats, {
-    missing_checked = 0,
-    missing_found = 0,
-    docs_read = 0,
-    docs_written = 0,
-    doc_write_failures = 0
-}).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index 045e074..f28dd51 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -93,7 +93,7 @@ process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
     Seq = case LS of undefined -> get(last_seq); _ -> LS end,
     OldSeq = get(last_seq),
     if Seq == OldSeq -> ok; true ->
-        Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+        Msg = {report_seq_done, {Ts, Seq}, couch_replicator_stats:new()},
         ok = gen_server:call(Parent, Msg, infinity)
     end,
     put(last_seq, Seq),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_stats.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_stats.erl b/src/couch_replicator_stats.erl
new file mode 100644
index 0000000..af8ba4e
--- /dev/null
+++ b/src/couch_replicator_stats.erl
@@ -0,0 +1,83 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_stats).
+
+-record(rep_stats, {
+    missing_checked = 0,
+    missing_found = 0,
+    docs_read = 0,
+    docs_written = 0,
+    doc_write_failures = 0
+}).
+
+-export([
+    new/0,
+    new/1,
+    get/2,
+    increment/2,
+    sum_stats/2
+]).
+
+-export([
+    missing_checked/1,
+    missing_found/1,
+    docs_read/1,
+    docs_written/1,
+    doc_write_failures/1
+]).
+
+new() ->
+    orddict:new().
+
+new(Initializers) when is_list(Initializers) ->
+    orddict:from_list(Initializers).
+
+missing_checked(Stats) ->
+    get(missing_checked, upgrade(Stats)).
+
+missing_found(Stats) ->
+    get(missing_found, upgrade(Stats)).
+
+docs_read(Stats) ->
+    get(docs_read, upgrade(Stats)).
+
+docs_written(Stats) ->
+    get(docs_written, upgrade(Stats)).
+
+doc_write_failures(Stats) ->
+    get(doc_write_failures, upgrade(Stats)).
+
+get(Field, Stats) ->
+    case orddict:find(Field, upgrade(Stats)) of
+        {ok, Value} ->
+            Value;
+        error ->
+            0
+    end.
+
+increment(Field, Stats) ->
+    orddict:update_counter(Field, 1, upgrade(Stats)).
+
+sum_stats(S1, S2) ->
+    orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
+
+upgrade(#rep_stats{} = Stats) ->
+    orddict:from_list([
+        {missing_checked, Stats#rep_stats.missing_checked},
+        {missing_found, Stats#rep_stats.missing_found},
+        {docs_read, Stats#rep_stats.docs_read},
+        {docs_written, Stats#rep_stats.docs_written},
+        {doc_write_failures, Stats#rep_stats.doc_write_failures}
+    ]);
+upgrade(Stats) ->
+    Stats.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index f489ff1..9d716c9 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -392,17 +392,9 @@ handle_db_event(DbName, compacted, Server) ->
 handle_db_event(_DbName, _Event, Server) ->
     {ok, Server}.
 
-
-sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
-    #rep_stats{
-        missing_checked =
-            S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
-        missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
-        docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
-        docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
-        doc_write_failures =
-            S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
-    }.
+% Obsolete - remove in next release
+sum_stats(S1, S2) ->
+    couch_replicator_stats:sum_stats(S1, S2).
 
 mp_parse_doc({headers, H}, []) ->
     case couch_util:get_value("content-type", H) of

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index fce445b..a0638c6 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -31,9 +31,6 @@
 -define(MAX_BULK_ATTS_PER_DOC, 8).
 -define(STATS_DELAY, 10000000).              % 10 seconds (in microseconds)
 
--define(inc_stat(StatPos, Stats, Inc),
-    setelement(StatPos, Stats, element(StatPos, Stats) + Inc)).
-
 -import(couch_replicator_utils, [
     open_db/1,
     close_db/1,
@@ -61,7 +58,7 @@
     writer = nil,
     pending_fetch = nil,
     flush_waiter = nil,
-    stats = #rep_stats{},
+    stats = couch_replicator_stats:new(),
     source_db_compaction_notifier = nil,
     target_db_compaction_notifier = nil,
     batch = #batch{}
@@ -306,7 +303,7 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
 
 
 local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
-    Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1),
+    Stats2 = couch_replicator_stats:increment(docs_read, Stats),
     case batch_doc(Doc) of
     true ->
         {ok, {Target, [Doc | DocList], Stats2, Cp}};
@@ -317,9 +314,9 @@ local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
         close_db(Target2),
         Stats3 = case Success of
         true ->
-            ?inc_stat(#rep_stats.docs_written, Stats2, 1);
+            couch_replicator_stats:increment(docs_written, Stats2);
         false ->
-            ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1)
+            couch_replicator_stats:increment(doc_write_failures, Stats2)
         end,
         Stats4 = maybe_report_stats(Cp, Stats3),
         {ok, {Target, DocList, Stats4, Cp}}
@@ -336,16 +333,16 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
     % source. The data property of each attachment is a function that starts
     % streaming the attachment data from the remote source, therefore it's
     % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
-    Stats = #rep_stats{docs_read = 1},
+    Stats = couch_replicator_stats:new([{docs_read, 1}]),
     couch_log:debug("Worker flushing doc with attachments", []),
     Target2 = open_db(Target),
     Success = (flush_doc(Target2, Doc) =:= ok),
     close_db(Target2),
     {Result, Stats2} = case Success of
     true ->
-        {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)};
+        {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
     false ->
-        {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)}
+        {{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)}
     end,
     ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
     Result;
@@ -376,7 +373,7 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
     gen_server:reply(Waiter, {ok, Stats}),
     erlang:put(last_stats_report, now()),
     State#state{
-        stats = #rep_stats{},
+        stats = couch_replicator_stats:new(),
         flush_waiter = nil,
         writer = nil,
         batch = #batch{}
@@ -389,8 +386,8 @@ maybe_flush_docs(Doc,State) ->
         stats = Stats, cp = Cp
     } = State,
     {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
-    Stats2 = couch_replicator_utils:sum_stats(Stats, WStats),
-    Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1),
+    Stats2 = couch_replicator_stats:sum_stats(Stats, WStats),
+    Stats3 = couch_replicator_stats:increment(docs_read, Stats2),
     Stats4 = maybe_report_stats(Cp, Stats3),
     State#state{stats = Stats4, batch = Batch2}.
 
@@ -402,9 +399,9 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
         couch_log:debug("Worker flushing doc with attachments", []),
         case flush_doc(Target, Doc) of
         ok ->
-            {Batch, #rep_stats{docs_written = 1}};
+            {Batch, couch_replicator_stats:new([{docs_written, 1}])};
         _ ->
-            {Batch, #rep_stats{doc_write_failures = 1}}
+            {Batch, couch_replicator_stats:new([{doc_write_failures, 1}])}
         end;
     true ->
         JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
@@ -414,7 +411,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
             Stats = flush_docs(Target, [JsonDoc | DocAcc]),
             {#batch{}, Stats};
         SizeAcc2 ->
-            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+            Stats = couch_replicator_stats:new(),
+            {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats}
         end
     end;
 
@@ -425,7 +423,8 @@ maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
         Stats = flush_docs(Target, [Doc | DocAcc]),
         {#batch{}, Stats};
     SizeAcc2 ->
-        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+        Stats = couch_replicator_stats:new(),
+        {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats}
     end.
 
 
@@ -440,7 +439,7 @@ batch_doc(#doc{atts = Atts}) ->
 
 
 flush_docs(_Target, []) ->
-    #rep_stats{};
+    couch_replicator_stats:new();
 
 flush_docs(Target, DocList) ->
     {ok, Errors} = couch_replicator_api_wrap:update_docs(
@@ -453,10 +452,10 @@ flush_docs(Target, DocList) ->
                 [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
                     get_value(error, Props, ""), get_value(reason, Props, "")])
         end, Errors),
-    #rep_stats{
-        docs_written = length(DocList) - length(Errors),
-        doc_write_failures = length(Errors)
-    }.
+    couch_replicator_stats:new([
+        {docs_written, length(DocList) - length(Errors)},
+        {doc_write_failures, length(Errors)}
+    ]).
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
@@ -495,10 +494,10 @@ find_missing(DocInfos, Target) ->
     MissingRevsCount = lists:foldl(
         fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
         0, Missing),
-    Stats = #rep_stats{
-        missing_checked = AllRevsCount,
-        missing_found = MissingRevsCount
-    },
+    Stats = couch_replicator_stats:new([
+        {missing_checked, AllRevsCount},
+        {missing_found, MissingRevsCount}
+    ]),
     {Missing, Stats}.
 
 
@@ -508,7 +507,7 @@ maybe_report_stats(Cp, Stats) ->
     true ->
         ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
         erlang:put(last_stats_report, Now),
-        #rep_stats{};
+        couch_replicator_stats:new();
     false ->
         Stats
     end.