You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/10 06:12:16 UTC

[couchdb] branch improve-error-reporting-in-replicator created (now f907da5)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at f907da5  WIP cleanup error reporting in replicator

This branch includes the following new commits:

     new f907da5  WIP cleanup error reporting in replicator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: WIP cleanup error reporting in replicator

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f907da52573cf41ba5be5fe76cdfd0e72972b356
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 10 01:10:57 2020 -0500

    WIP cleanup error reporting in replicator
    
    Issue: #2413
---
 .../src/couch_replicator_api_wrap.erl              |  4 ++-
 .../src/couch_replicator_scheduler.erl             |  8 ++++--
 .../src/couch_replicator_scheduler_job.erl         | 30 ++++++----------------
 .../src/couch_replicator_worker.erl                | 11 +++++++-
 4 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7d..d5477fe 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -408,7 +408,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
            (413, _, _) ->
                 {error, request_body_too_large};
            (417, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)}
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+                {error, {bulk_docs_failed, ErrCode, ErrMsg}}
         end).
 
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c..53c040e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
 
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
     {ok, Job} = job_by_pid(Pid),
+    Reason = case Reason0 of
+        {shutdown, ShutdownReason} -> ShutdownReason;
+        Other -> Other
+    end,
     ok = handle_crashed_job(Job, Reason, State),
     {noreply, State};
 
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
 % optimize some options to help the job make progress.
 -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
 maybe_optimize_job_for_rate_limiting(Job = #job{history =
-    [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+    [{{crashed, max_backoff}, _} | _]}) ->
     Opts = [
         {checkpoint_interval, 5000},
         {worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb..54a69a2 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
     workers,
     stats = couch_replicator_stats:new(),
     session_id,
-    source_monitor = nil,
-    target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
 
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
 handle_info({'EXIT', Pid, max_backoff}, State) ->
     couch_log:error("Max backoff reached child process ~p", [Pid]),
     {stop, {shutdown, max_backoff}, State};
@@ -264,7 +254,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
     couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_reader_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
     {noreply, State};
@@ -272,7 +262,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
     couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
@@ -280,7 +270,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
     couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
     case Workers -- [Pid] of
@@ -305,7 +295,11 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
     true ->
         couch_stats:increment_counter([couch_replicator, worker_deaths]),
         couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
+        StopReason = case Reason of
+            {shutdown, _} = Err -> Err;
+            Other -> {worker_died, Pid, Other}
+         end,
+        {stop, StopReason, State2}
     end;
 
 handle_info(timeout, InitArgs) ->
@@ -592,8 +586,6 @@ init_state(Rep) ->
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
         source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options, true),
         checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +897,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
     end.
 
 
-db_monitor(#httpdb{}) ->
-	nil;
-db_monitor(Db) ->
-	couch_db:monitor(Db).
-
-
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,
     Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f58..26a0dcf 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,9 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
 handle_info({'EXIT', _Pid, max_backoff}, State) ->
     {stop, {shutdown, max_backoff}, State};
 
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
    {stop, {process_died, Pid, Reason}, State}.
 
@@ -386,7 +389,13 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
     couch_replicator_stats:new([
         {docs_written, length(DocList) - length(Errors)},
         {doc_write_failures, length(Errors)}
-    ]).
+    ]);
+handle_flush_docs_result({error, {bulk_docs_failed, ErrCode, ErrMsg}}, Target,
+        _) ->
+    TgtUri = couch_replicator_api_wrap:db_uri(Target),
+    LogMsg = "Replicator: bulk docs to `~s` failed with ~p ~p",
+    couch_log:error(LogMsg, [TgtUri, ErrCode, ErrMsg]),
+    exit({bulk_docs_failed, ErrCode, ErrMsg}).
 
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->