You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/10/31 16:32:58 UTC

[couchdb] branch active-task-stats-in-scheduler updated: Return detailed replication stats for running and pending jobs

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch active-task-stats-in-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/active-task-stats-in-scheduler by this push:
     new d1f7a95  Return detailed replication stats for running and pending jobs
d1f7a95 is described below

commit d1f7a957247194532f6dd7d215d17b822dafaba1
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu Oct 31 11:58:39 2019 -0400

    Return detailed replication stats for running and pending jobs
    
    Previously `_scheduled/docs` returned detailed replication statistics for
    completed jobs only. To get the same level of details from a running or pending
    jobs users had to use `_active_tasks`, which is not optimal and required jumping
    between monitoring endpoints.
    
    `info` field was originally meant to hold these statistics but they were not
    implemented and it just returned `null` as a placeholder. With work for 3.0
    finalizing, this might be a good time to add this improvement to avoid
    disturbing the API afterwards.
    
    Just updating the `_scheduler/docs` was not quite enough since, replications
    started from the `_replicate` endpoint would not be visible there and users
    would still have to access `_active_tasks` to get inspect them, so let's add
    the `info` field to the `_scheduler/jobs` as well.
    
    After this update, all states and status details from `_active_tasks` and
    `_replicator` docs should be available under `_scheduler/jobs` and
    `_scheduler/docs` endpoints.
---
 .../src/couch_replicator_doc_processor.erl         | 11 +----
 .../src/couch_replicator_scheduler.erl             | 21 ++++++---
 .../src/couch_replicator_scheduler_job.erl         | 16 +++----
 .../src/couch_replicator_stats.erl                 | 50 ++++++++++------------
 .../src/couch_replicator_utils.erl                 | 16 ++++++-
 5 files changed, 60 insertions(+), 54 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 772037d..6d1c791 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -530,15 +530,6 @@ doc_lookup(Db, DocId, HealthThreshold) ->
     end.
 
 
--spec ejson_state_info(binary() | nil) -> binary() | null.
-ejson_state_info(nil) ->
-    null;
-ejson_state_info(Info) when is_binary(Info) ->
-    Info;
-ejson_state_info(Info) ->
-    couch_replicator_utils:rep_error_to_binary(Info).
-
-
 -spec ejson_rep_id(rep_id() | nil) -> binary() | null.
 ejson_rep_id(nil) ->
     null;
@@ -576,7 +567,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
         {database, DbName},
         {id, ejson_rep_id(RepId)},
         {state, RepState},
-        {info, ejson_state_info(StateInfo)},
+        {info, couch_replicator_utils:ejson_state_info(StateInfo)},
         {error_count, ErrorCount},
         {node, node()},
         {last_updated, couch_replicator_utils:iso8601(StateTime)},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index c9da377..d534973 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -148,19 +148,19 @@ job_summary(JobId, HealthThreshold) ->
                         [{{crashed, Error}, _When} | _] ->
                             {crashing, crash_reason_json(Error)};
                         [_ | _] ->
-                            {pending, null}
+                            {pending, Rep#rep.stats}
                     end;
                 {undefined, ErrorCount} when ErrorCount > 0 ->
                      [{{crashed, Error}, _When} | _] = History,
                      {crashing, crash_reason_json(Error)};
                 {Pid, ErrorCount} when is_pid(Pid) ->
-                     {running, null}
+                     {running, Rep#rep.stats}
             end,
             [
                 {source, iolist_to_binary(ejson_url(Rep#rep.source))},
                 {target, iolist_to_binary(ejson_url(Rep#rep.target))},
                 {state, State},
-                {info, Info},
+                {info, couch_replicator_utils:ejson_state_info(Info)},
                 {error_count, ErrorCount},
                 {last_updated, last_updated(History)},
                 {start_time,
@@ -829,6 +829,7 @@ job_ejson(Job) ->
         {database, Rep#rep.db_name},
         {user, (Rep#rep.user_ctx)#user_ctx.name},
         {doc_id, Rep#rep.doc_id},
+        {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)},
         {history, History},
         {node, node()},
         {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
@@ -1431,7 +1432,12 @@ t_job_summary_running() ->
         Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
         ?assertEqual(running, proplists:get_value(state, Summary)),
         ?assertEqual(null, proplists:get_value(info, Summary)),
-        ?assertEqual(0, proplists:get_value(error_count, Summary))
+        ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+        Stats = [{source_seq, <<"1-abc">>}],
+        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+        Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+        ?assertEqual({Stats}, proplists:get_value(info, Summary1))
     end).
 
 
@@ -1447,7 +1453,12 @@ t_job_summary_pending() ->
         Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
         ?assertEqual(pending, proplists:get_value(state, Summary)),
         ?assertEqual(null, proplists:get_value(info, Summary)),
-        ?assertEqual(0, proplists:get_value(error_count, Summary))
+        ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+        Stats = [{doc_write_failures, 1}],
+        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+        Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+        ?assertEqual({Stats}, proplists:get_value(info, Summary1))
     end).
 
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 565a2bd..d69febb 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -600,7 +600,7 @@ init_state(Rep) ->
                                         ?DEFAULT_CHECKPOINT_INTERVAL),
         type = Type,
         view = View,
-        stats = Stats
+        stats = couch_replicator_stats:new(Stats)
     },
     State#rep_state{timer = start_timer(State)}.
 
@@ -949,20 +949,16 @@ get_pending_count_int(#rep_state{source = Db}=St) ->
 
 update_task(State) ->
     #rep_state{
+        rep_details = #rep{id = JobId},
         current_through_seq = {_, ThroughSeq},
         highest_seq_done = {_, HighestSeq}
     } = State,
-    update_scheduler_job_stats(State),
-    couch_task_status:update(
-        rep_stats(State) ++ [
+    Status = rep_stats(State) ++ [
         {source_seq, HighestSeq},
         {through_seq, ThroughSeq}
-    ]).
-
-
-update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
-    JobId = Rep#rep.id,
-    couch_replicator_scheduler:update_job_stats(JobId, Stats).
+    ],
+    couch_replicator_scheduler:update_job_stats(JobId, Status),
+    couch_task_status:update(Status).
 
 
 rep_stats(State) ->
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index af8ba4e..cd62949 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -12,14 +12,6 @@
 
 -module(couch_replicator_stats).
 
--record(rep_stats, {
-    missing_checked = 0,
-    missing_found = 0,
-    docs_read = 0,
-    docs_written = 0,
-    doc_write_failures = 0
-}).
-
 -export([
     new/0,
     new/1,
@@ -39,26 +31,27 @@
 new() ->
     orddict:new().
 
-new(Initializers) when is_list(Initializers) ->
-    orddict:from_list(Initializers).
+new(Initializers0) when is_list(Initializers0) ->
+    Initializers1 = lists:filtermap(fun fmap/1, Initializers0),
+    orddict:from_list(Initializers1).
 
 missing_checked(Stats) ->
-    get(missing_checked, upgrade(Stats)).
+    get(missing_checked, Stats).
 
 missing_found(Stats) ->
-    get(missing_found, upgrade(Stats)).
+    get(missing_found, Stats).
 
 docs_read(Stats) ->
-    get(docs_read, upgrade(Stats)).
+    get(docs_read, Stats).
 
 docs_written(Stats) ->
-    get(docs_written, upgrade(Stats)).
+    get(docs_written, Stats).
 
 doc_write_failures(Stats) ->
-    get(doc_write_failures, upgrade(Stats)).
+    get(doc_write_failures, Stats).
 
 get(Field, Stats) ->
-    case orddict:find(Field, upgrade(Stats)) of
+    case orddict:find(Field, Stats) of
         {ok, Value} ->
             Value;
         error ->
@@ -66,18 +59,19 @@ get(Field, Stats) ->
     end.
 
 increment(Field, Stats) ->
-    orddict:update_counter(Field, 1, upgrade(Stats)).
+    orddict:update_counter(Field, 1, Stats).
 
 sum_stats(S1, S2) ->
-    orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
+    orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
+
 
-upgrade(#rep_stats{} = Stats) ->
-    orddict:from_list([
-        {missing_checked, Stats#rep_stats.missing_checked},
-        {missing_found, Stats#rep_stats.missing_found},
-        {docs_read, Stats#rep_stats.docs_read},
-        {docs_written, Stats#rep_stats.docs_written},
-        {doc_write_failures, Stats#rep_stats.doc_write_failures}
-    ]);
-upgrade(Stats) ->
-    Stats.
+% Handle initializing from a status object which uses same values but different
+% field names.
+fmap({revisions_checked, V})       -> {true, {missing_checked, V}};
+fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
+fmap({missing_checked, _})         -> true;
+fmap({missing_found, _})           -> true;
+fmap({docs_read, _})               -> true;
+fmap({docs_written, _})            -> true;
+fmap({doc_write_failures, _})      -> true;
+fmap({_, _})                       -> false.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index ccf2413..856c1b5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -24,7 +24,8 @@
    iso8601/1,
    filter_state/3,
    remove_basic_auth_from_headers/1,
-   normalize_rep/1
+   normalize_rep/1,
+   ejson_state_info/1
 ]).
 
 
@@ -176,6 +177,19 @@ normalize_rep(#rep{} = Rep)->
     }.
 
 
+-spec ejson_state_info(binary() | nil) -> binary() | null.
+ejson_state_info(nil) ->
+    null;
+ejson_state_info(Info) when is_binary(Info) ->
+    Info;
+ejson_state_info([]) ->
+    null;  % Status not set yet => null for compatibility reasons
+ejson_state_info([{_, _} | _] = Info) ->
+    {Info};
+ejson_state_info(Info) ->
+    couch_replicator_utils:rep_error_to_binary(Info).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").