You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/10 06:12:17 UTC

[couchdb] 01/01: WIP cleanup error reporting in replicator

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f907da52573cf41ba5be5fe76cdfd0e72972b356
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 10 01:10:57 2020 -0500

    WIP cleanup error reporting in replicator
    
    Issue: #2413
---
 .../src/couch_replicator_api_wrap.erl              |  4 ++-
 .../src/couch_replicator_scheduler.erl             |  8 ++++--
 .../src/couch_replicator_scheduler_job.erl         | 30 ++++++----------------
 .../src/couch_replicator_worker.erl                | 11 +++++++-
 4 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7d..d5477fe 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -408,7 +408,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
            (413, _, _) ->
                 {error, request_body_too_large};
            (417, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)}
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+                {error, {bulk_docs_failed, ErrCode, ErrMsg}}
         end).
 
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c..53c040e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
 
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
     {ok, Job} = job_by_pid(Pid),
+    Reason = case Reason0 of
+        {shutdown, ShutdownReason} -> ShutdownReason;
+        Other -> Other
+    end,
     ok = handle_crashed_job(Job, Reason, State),
     {noreply, State};
 
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
 % optimize some options to help the job make progress.
 -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
 maybe_optimize_job_for_rate_limiting(Job = #job{history =
-    [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+    [{{crashed, max_backoff}, _} | _]}) ->
     Opts = [
         {checkpoint_interval, 5000},
         {worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb..54a69a2 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
     workers,
     stats = couch_replicator_stats:new(),
     session_id,
-    source_monitor = nil,
-    target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
 
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
 handle_info({'EXIT', Pid, max_backoff}, State) ->
     couch_log:error("Max backoff reached child process ~p", [Pid]),
     {stop, {shutdown, max_backoff}, State};
@@ -264,7 +254,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
     couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_reader_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
     {noreply, State};
@@ -272,7 +262,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
     couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
@@ -280,7 +270,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
     couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
     case Workers -- [Pid] of
@@ -305,7 +295,11 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
     true ->
         couch_stats:increment_counter([couch_replicator, worker_deaths]),
         couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
+        StopReason = case Reason of
+            {shutdown, _} = Err -> Err;
+            Other -> {worker_died, Pid, Other}
+         end,
+        {stop, StopReason, State2}
     end;
 
 handle_info(timeout, InitArgs) ->
@@ -592,8 +586,6 @@ init_state(Rep) ->
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
         source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options, true),
         checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +897,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
     end.
 
 
-db_monitor(#httpdb{}) ->
-	nil;
-db_monitor(Db) ->
-	couch_db:monitor(Db).
-
-
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,
     Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f58..26a0dcf 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,9 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
 handle_info({'EXIT', _Pid, max_backoff}, State) ->
     {stop, {shutdown, max_backoff}, State};
 
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
    {stop, {process_died, Pid, Reason}, State}.
 
@@ -386,7 +389,13 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
     couch_replicator_stats:new([
         {docs_written, length(DocList) - length(Errors)},
         {doc_write_failures, length(Errors)}
-    ]).
+    ]);
+handle_flush_docs_result({error, {bulk_docs_failed, ErrCode, ErrMsg}}, Target,
+        _) ->
+    TgtUri = couch_replicator_api_wrap:db_uri(Target),
+    LogMsg = "Replicator: bulk docs to `~s` failed with ~p ~p",
+    couch_log:error(LogMsg, [TgtUri, ErrCode, ErrMsg]),
+    exit({bulk_docs_failed, ErrCode, ErrMsg}).
 
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->