You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:43 UTC
[12/37] couch-replicator commit: updated refs/heads/master to aafb5f9
Provide an abstract interface to replicator stats
Switch to an ordered_dict() under the hood.
BugzID: 24298
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/65a1a106
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/65a1a106
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/65a1a106
Branch: refs/heads/master
Commit: 65a1a106a4ec3e4e3adbe5b26566c5d0eea3c940
Parents: 6f2dc4c
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 15:53:23 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:02:53 2014 +0100
----------------------------------------------------------------------
src/couch_replicator.erl | 28 +++++-----
src/couch_replicator.hrl | 8 ---
src/couch_replicator_changes_reader.erl | 2 +-
src/couch_replicator_stats.erl | 83 ++++++++++++++++++++++++++++
src/couch_replicator_utils.erl | 14 +----
src/couch_replicator_worker.erl | 53 +++++++++---------
6 files changed, 127 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 5891084..47bbd3a 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -64,7 +64,7 @@
changes_manager,
changes_reader,
workers,
- stats = #rep_stats{},
+ stats = couch_replicator_stats:new(),
session_id,
source_db_compaction_notifier = nil,
target_db_compaction_notifier = nil,
@@ -694,11 +694,11 @@ do_checkpoint(State) ->
{<<"start_last_seq">>, StartSeq},
{<<"end_last_seq">>, NewSeq},
{<<"recorded_seq">>, NewSeq},
- {<<"missing_checked">>, Stats#rep_stats.missing_checked},
- {<<"missing_found">>, Stats#rep_stats.missing_found},
- {<<"docs_read">>, Stats#rep_stats.docs_read},
- {<<"docs_written">>, Stats#rep_stats.docs_written},
- {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+ {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
+ {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
+ {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
]},
BaseHistory = [
{<<"session_id">>, SessionId},
@@ -714,9 +714,9 @@ do_checkpoint(State) ->
[
{<<"start_time">>, StartTime},
{<<"end_time">>, EndTime},
- {<<"docs_read">>, Stats#rep_stats.docs_read},
- {<<"docs_written">>, Stats#rep_stats.docs_written},
- {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+ {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
]
end,
% limit history to 50 entries
@@ -921,12 +921,12 @@ rep_stats(State) ->
source_seq = SourceCurSeq
} = State,
[
- {revisions_checked, Stats#rep_stats.missing_checked},
- {missing_revisions_found, Stats#rep_stats.missing_found},
- {docs_read, Stats#rep_stats.docs_read},
- {docs_written, Stats#rep_stats.docs_written},
+ {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
+ {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
+ {docs_read, couch_replicator_stats:docs_read(Stats)},
+ {docs_written, couch_replicator_stats:docs_written(Stats)},
{changes_pending, pending(SourceCurSeq, CommittedSeq)},
- {doc_write_failures, Stats#rep_stats.doc_write_failures},
+ {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
{checkpointed_source_seq, CommittedSeq}
].
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 018aa4b..893d9e4 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -20,11 +20,3 @@
user_ctx,
doc_id
}).
-
--record(rep_stats, {
- missing_checked = 0,
- missing_found = 0,
- docs_read = 0,
- docs_written = 0,
- doc_write_failures = 0
-}).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index 045e074..f28dd51 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -93,7 +93,7 @@ process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
Seq = case LS of undefined -> get(last_seq); _ -> LS end,
OldSeq = get(last_seq),
if Seq == OldSeq -> ok; true ->
- Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+ Msg = {report_seq_done, {Ts, Seq}, couch_replicator_stats:new()},
ok = gen_server:call(Parent, Msg, infinity)
end,
put(last_seq, Seq),
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_stats.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_stats.erl b/src/couch_replicator_stats.erl
new file mode 100644
index 0000000..af8ba4e
--- /dev/null
+++ b/src/couch_replicator_stats.erl
@@ -0,0 +1,83 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_stats).
+
+-record(rep_stats, {
+ missing_checked = 0,
+ missing_found = 0,
+ docs_read = 0,
+ docs_written = 0,
+ doc_write_failures = 0
+}).
+
+-export([
+ new/0,
+ new/1,
+ get/2,
+ increment/2,
+ sum_stats/2
+]).
+
+-export([
+ missing_checked/1,
+ missing_found/1,
+ docs_read/1,
+ docs_written/1,
+ doc_write_failures/1
+]).
+
+new() ->
+ orddict:new().
+
+new(Initializers) when is_list(Initializers) ->
+ orddict:from_list(Initializers).
+
+missing_checked(Stats) ->
+ get(missing_checked, upgrade(Stats)).
+
+missing_found(Stats) ->
+ get(missing_found, upgrade(Stats)).
+
+docs_read(Stats) ->
+ get(docs_read, upgrade(Stats)).
+
+docs_written(Stats) ->
+ get(docs_written, upgrade(Stats)).
+
+doc_write_failures(Stats) ->
+ get(doc_write_failures, upgrade(Stats)).
+
+get(Field, Stats) ->
+ case orddict:find(Field, upgrade(Stats)) of
+ {ok, Value} ->
+ Value;
+ error ->
+ 0
+ end.
+
+increment(Field, Stats) ->
+ orddict:update_counter(Field, 1, upgrade(Stats)).
+
+sum_stats(S1, S2) ->
+ orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
+
+upgrade(#rep_stats{} = Stats) ->
+ orddict:from_list([
+ {missing_checked, Stats#rep_stats.missing_checked},
+ {missing_found, Stats#rep_stats.missing_found},
+ {docs_read, Stats#rep_stats.docs_read},
+ {docs_written, Stats#rep_stats.docs_written},
+ {doc_write_failures, Stats#rep_stats.doc_write_failures}
+ ]);
+upgrade(Stats) ->
+ Stats.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index f489ff1..9d716c9 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -392,17 +392,9 @@ handle_db_event(DbName, compacted, Server) ->
handle_db_event(_DbName, _Event, Server) ->
{ok, Server}.
-
-sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
- #rep_stats{
- missing_checked =
- S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
- missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
- docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
- docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
- doc_write_failures =
- S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
- }.
+% Obsolete - remove in next release
+sum_stats(S1, S2) ->
+ couch_replicator_stats:sum_stats(S1, S2).
mp_parse_doc({headers, H}, []) ->
case couch_util:get_value("content-type", H) of
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/65a1a106/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index fce445b..a0638c6 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -31,9 +31,6 @@
-define(MAX_BULK_ATTS_PER_DOC, 8).
-define(STATS_DELAY, 10000000). % 10 seconds (in microseconds)
--define(inc_stat(StatPos, Stats, Inc),
- setelement(StatPos, Stats, element(StatPos, Stats) + Inc)).
-
-import(couch_replicator_utils, [
open_db/1,
close_db/1,
@@ -61,7 +58,7 @@
writer = nil,
pending_fetch = nil,
flush_waiter = nil,
- stats = #rep_stats{},
+ stats = couch_replicator_stats:new(),
source_db_compaction_notifier = nil,
target_db_compaction_notifier = nil,
batch = #batch{}
@@ -306,7 +303,7 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
- Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1),
+ Stats2 = couch_replicator_stats:increment(docs_read, Stats),
case batch_doc(Doc) of
true ->
{ok, {Target, [Doc | DocList], Stats2, Cp}};
@@ -317,9 +314,9 @@ local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
close_db(Target2),
Stats3 = case Success of
true ->
- ?inc_stat(#rep_stats.docs_written, Stats2, 1);
+ couch_replicator_stats:increment(docs_written, Stats2);
false ->
- ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1)
+ couch_replicator_stats:increment(doc_write_failures, Stats2)
end,
Stats4 = maybe_report_stats(Cp, Stats3),
{ok, {Target, DocList, Stats4, Cp}}
@@ -336,16 +333,16 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
% source. The data property of each attachment is a function that starts
% streaming the attachment data from the remote source, therefore it's
% convenient to call it ASAP to avoid ibrowse inactivity timeouts.
- Stats = #rep_stats{docs_read = 1},
+ Stats = couch_replicator_stats:new([{docs_read, 1}]),
couch_log:debug("Worker flushing doc with attachments", []),
Target2 = open_db(Target),
Success = (flush_doc(Target2, Doc) =:= ok),
close_db(Target2),
{Result, Stats2} = case Success of
true ->
- {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)};
+ {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
false ->
- {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)}
+ {{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)}
end,
ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
Result;
@@ -376,7 +373,7 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
gen_server:reply(Waiter, {ok, Stats}),
erlang:put(last_stats_report, now()),
State#state{
- stats = #rep_stats{},
+ stats = couch_replicator_stats:new(),
flush_waiter = nil,
writer = nil,
batch = #batch{}
@@ -389,8 +386,8 @@ maybe_flush_docs(Doc,State) ->
stats = Stats, cp = Cp
} = State,
{Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
- Stats2 = couch_replicator_utils:sum_stats(Stats, WStats),
- Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1),
+ Stats2 = couch_replicator_stats:sum_stats(Stats, WStats),
+ Stats3 = couch_replicator_stats:increment(docs_read, Stats2),
Stats4 = maybe_report_stats(Cp, Stats3),
State#state{stats = Stats4, batch = Batch2}.
@@ -402,9 +399,9 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
couch_log:debug("Worker flushing doc with attachments", []),
case flush_doc(Target, Doc) of
ok ->
- {Batch, #rep_stats{docs_written = 1}};
+ {Batch, couch_replicator_stats:new([{docs_written, 1}])};
_ ->
- {Batch, #rep_stats{doc_write_failures = 1}}
+ {Batch, couch_replicator_stats:new([{doc_write_failures, 1}])}
end;
true ->
JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
@@ -414,7 +411,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
Stats = flush_docs(Target, [JsonDoc | DocAcc]),
{#batch{}, Stats};
SizeAcc2 ->
- {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+ Stats = couch_replicator_stats:new(),
+ {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats}
end
end;
@@ -425,7 +423,8 @@ maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
Stats = flush_docs(Target, [Doc | DocAcc]),
{#batch{}, Stats};
SizeAcc2 ->
- {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+ Stats = couch_replicator_stats:new(),
+ {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats}
end.
@@ -440,7 +439,7 @@ batch_doc(#doc{atts = Atts}) ->
flush_docs(_Target, []) ->
- #rep_stats{};
+ couch_replicator_stats:new();
flush_docs(Target, DocList) ->
{ok, Errors} = couch_replicator_api_wrap:update_docs(
@@ -453,10 +452,10 @@ flush_docs(Target, DocList) ->
[get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
get_value(error, Props, ""), get_value(reason, Props, "")])
end, Errors),
- #rep_stats{
- docs_written = length(DocList) - length(Errors),
- doc_write_failures = length(Errors)
- }.
+ couch_replicator_stats:new([
+ {docs_written, length(DocList) - length(Errors)},
+ {doc_write_failures, length(Errors)}
+ ]).
flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
@@ -495,10 +494,10 @@ find_missing(DocInfos, Target) ->
MissingRevsCount = lists:foldl(
fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
0, Missing),
- Stats = #rep_stats{
- missing_checked = AllRevsCount,
- missing_found = MissingRevsCount
- },
+ Stats = couch_replicator_stats:new([
+ {missing_checked, AllRevsCount},
+ {missing_found, MissingRevsCount}
+ ]),
{Missing, Stats}.
@@ -508,7 +507,7 @@ maybe_report_stats(Cp, Stats) ->
true ->
ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
erlang:put(last_stats_report, Now),
- #rep_stats{};
+ couch_replicator_stats:new();
false ->
Stats
end.