You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/07/20 22:34:42 UTC

[couchdb] 09/10: [wip] move stats updates individual jobs and use couch_jobs for it

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b75ad64f7ab1fd0b027430d47dad3dd700309069
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Jul 20 18:33:42 2020 -0400

    [wip] move stats updates individual jobs and use couch_jobs for it
---
 .../src/couch_replicator_scheduler.erl             | 20 +----
 .../src/couch_replicator_scheduler_job.erl         | 88 ++++++++++++----------
 2 files changed, 49 insertions(+), 59 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index 00a352b..f0edf93 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -40,8 +40,7 @@
    health_threshold/0,
    jobs/0,
    job/1,
-   restart_job/1,
-   update_job_stats/2
+   restart_job/1
 ]).
 
 %% config_listener callbacks
@@ -217,11 +216,6 @@ restart_job(JobId) ->
     end.
 
 
--spec update_job_stats(job_id(), term()) -> ok.
-update_job_stats(JobId, Stats) ->
-    gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}).
-
-
 %% gen_server functions
 
 init(_) ->
@@ -291,16 +285,6 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval),
     couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
     {noreply, State#state{interval = Interval}};
 
-handle_cast({update_job_stats, JobId, Stats}, State) ->
-    case rep_state(JobId) of
-        nil ->
-            ok;
-        #rep{} = Rep ->
-            NewRep = Rep#rep{stats = Stats},
-            true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep})
-    end,
-    {noreply, State};
-
 handle_cast(UnexpectedMsg, State) ->
     couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]),
     {noreply, State}.
@@ -1445,7 +1429,6 @@ t_job_summary_running() ->
         ?assertEqual(0, proplists:get_value(error_count, Summary)),
 
         Stats = [{source_seq, <<"1-abc">>}],
-        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
         Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
         ?assertEqual({Stats}, proplists:get_value(info, Summary1))
     end).
@@ -1466,7 +1449,6 @@ t_job_summary_pending() ->
         ?assertEqual(0, proplists:get_value(error_count, Summary)),
 
         Stats = [{doc_write_failures, 1}],
-        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
         Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
         ?assertEqual({Stats}, proplists:get_value(info, Summary1))
     end).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index fef96dd..b5b10ab 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -75,20 +75,18 @@
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
-    type = db,
-    view = nil,
     user = null,
     options = #{}
 }).
 
 
-start_link(#{] = Job, #{} = JobData) ->
+start_link(#{} = Job, #{} = JobData) ->
     case gen_server:start_link(?MODULE, {Job, JobData}, []) of
         {ok, Pid} ->
             {ok, Pid};
         {error, Reason} ->
             #{?REP := Rep} = JobData,
-            {<<"id">> := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep,
+            {?REP_ID := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep,
             Source = couch_replicator_api_wrap:db_uri(Src),
             Target = couch_replicator_api_wrap:db_uri(Tgt),
             ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)",
@@ -211,8 +209,7 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
         seqs_in_progress = NewSeqsInProgress,
         highest_seq_done = NewHighestDone
     },
-    update_task(NewState),
-    {noreply, NewState}.
+    {noreply, update_task(NewState)}.
 
 
 handle_cast(checkpoint, State) ->
@@ -342,14 +339,13 @@ terminate(shutdown, #rep_state{id = RepId} = State) ->
             LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
             couch_log:error(LogMsg, [?MODULE, RepId, Error]),
             State
-    end,
-    finish_couch_job(State1, <<"stopped">>, null),
+    end,    finish_couch_job(State1, <<"stopped">>, null),
     terminate_cleanup(State1);
 
 terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) ->
     % Here we handle the case when replication fails during initialization.
     % That is before the #rep_state{} is even built.
-    #{?REP := #{<<"id">> := RepId}} = JobData,
+    #{?REP := #{?REP_ID := RepId}} = JobData,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
     couch_log:warning("Replication `~s` reached max backoff ", [RepId]),
     finish_couch_job(Job, JobData, <<"error">>, max_backoff);
@@ -358,11 +354,11 @@ terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) ->
     % Here we handle the case when replication fails during initialization.
     #{?REP := Rep} = JobData,
     #{
-       <<"id">> := Id,
+       ?REP_ID := Id,
        ?SOURCE := Source0,
        ?TARGET := Target0,
-       <<"doc_id">> := DocId,
-       <<"db_name">> := DbName
+       ?DOC_ID := DocId,
+       ?DB_NAME := DbName
     } = Rep,
     Source = couch_replicator_api_wrap:db_uri(Source0),
     Target = couch_replicator_api_wrap:db_uri(Target0),
@@ -542,8 +538,8 @@ finish_couch_job(#rep_state{} = State, FinishedState, Result) ->
 
 
 finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) ->
-    #{?REP := #{<<"id">> := RepId}} = JobData,
-    case Result of
+    #{?REP := #{?REP_ID := RepId}} = JobData,
+    Result = case Result0 of
         null -> null;
         #{} -> Result0;
         <<_/binary>> -> Result0;
@@ -583,19 +579,17 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
     State#rep_state{timer = nil}.
 
 
-init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
+init_state(#{} = Job, #{} = JobData) ->
+    #{?REP := Rep} = JobData,
     #{
-        <<"id">> := Id,
-        <<"base_id">> := BaseId,
+        ?REP_ID := Id,
+        ?BASE_ID := BaseId,
         ?SOURCE := Src0,
         ?TARGET := Tgt,
-        <<"type">> := Type,
-        <<"view">> := View,
-        <<"start_time">> := StartTime,
-        <<"stats">> := ArgStats0,
-        <<"options">> := OptionsMap,
-        <<"db_name">> := DbName,
-        <<"doc_id">> := DocId,
+        ?START_TIME := StartTime,
+        ?OPTIONS := OptionsMap,
+        ?DB_NAME := DbName,
+        ?DOC_ID := DocId,
     } = Rep,
 
     Options = maps:fold(fun(K, V, Acc) ->
@@ -616,12 +610,13 @@ init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
 
     {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
 
-    ArgStats1 = couch_replicator_stats:new(ArgStats0),
+    #{?REP_STATS := Stats0} = JobData,
+    Stats1 = couch_replicator_stats:new(Stats0),
     HistoryStats = case History of
         [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
         _ -> couch_replicator_stats:new()
     end,
-    Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
+    Stats2 = couch_replicator_stats:max_stats(Stats1, HistoryStats),
 
     StartSeq1 = get_value(since_seq, Options, StartSeq0),
     StartSeq = {0, StartSeq1},
@@ -652,16 +647,14 @@ init_state(#{} = Job, #{?REP =: Rep}} = JobData) ->
         source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options),
         checkpoint_interval = get_value(checkpoint_interval, Options),
-        type = Type,
-        view = View,
-        stats = Stats
+        stats = Stats2,
         doc_id = DocId,
         db_name = DbName
     },
     State#rep_state{timer = start_timer(State)}.
 
 
-find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) ->
+find_and_migrate_logs(DbList, #{?BASE_ID := BaseId} = Rep) ->
     LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
     fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []).
 
@@ -689,7 +682,7 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
     end.
 
 
-maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) ->
+maybe_save_migrated_log(#{?OPTIONS := Options}, Db, #doc{} = Doc, OldId) ->
     case maps:get(<<"use_checkpoints">>, Options) of
         true ->
             update_checkpoint(Db, Doc),
@@ -736,8 +729,7 @@ do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
     NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
     {ok, NewState};
 do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    update_task(State),
-    {ok, State};
+    {ok, update_task(State)};
 do_checkpoint(State) ->
     #rep_state{
         source_name=SourceName,
@@ -818,8 +810,7 @@ do_checkpoint(State) ->
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
                 target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
             },
-            update_task(NewState),
-            {ok, NewState}
+            {ok, update_task(NewState)}
         catch throw:{checkpoint_commit_failure, _} = Failure ->
             Failure
         end;
@@ -996,18 +987,35 @@ get_pending_count_int(#rep_state{source = Db}=St) ->
     Pending.
 
 
-update_task(State) ->
+update_task(#rep_state{} = State) ->
     #rep_state{
-        rep_details = #rep{id = JobId},
         current_through_seq = {_, ThroughSeq},
         highest_seq_done = {_, HighestSeq}
     } = State,
-    Status = rep_stats(State) ++ [
+    NewStats = rep_stats(State) ++ [
         {source_seq, HighestSeq},
         {through_seq, ThroughSeq}
     ],
-    couch_replicator_scheduler:update_job_stats(JobId, Status),
-    couch_task_status:update(Status).
+    {ok, NewState} = update_job_stats(State, NewStats),
+    couch_task_status:update(Status),
+    NewState.
+
+
+update_job_stats(#rep_state{} = State, NewStats) ->
+    #rep_state{
+        job = Job,
+        job_data = JobData
+    } = State,
+    JsonStats = couch_replicator_stats:to_json(NewStats),
+    JobData1 = JobData#{?REP_STATS => JsonStats},
+    case couch_jobs:update(undefined, Job, JobData1) of
+        {ok, Job1} ->
+            {ok, State#rep_state{job := Job1}};
+        {error, halt} ->
+            ErrMsg = "~p : job halted, replication id: ~p",
+            couch_log:error(ErrMsg, [?MODULE, State#rep_state.id]),
+            error({error, halt})
+    end.
 
 
 rep_stats(State) ->