You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/10 06:12:17 UTC
[couchdb] 01/01: WIP cleanup error reporting in replicator
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f907da52573cf41ba5be5fe76cdfd0e72972b356
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 10 01:10:57 2020 -0500
WIP cleanup error reporting in replicator
Issue: #2413
---
.../src/couch_replicator_api_wrap.erl | 4 ++-
.../src/couch_replicator_scheduler.erl | 8 ++++--
.../src/couch_replicator_scheduler_job.erl | 30 ++++++----------------
.../src/couch_replicator_worker.erl | 11 +++++++-
4 files changed, 27 insertions(+), 26 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7d..d5477fe 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -408,7 +408,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
(413, _, _) ->
{error, request_body_too_large};
(417, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)}
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {bulk_docs_failed, ErrCode, ErrMsg}}
end).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c..53c040e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{ok, Job} = job_by_pid(Pid),
+ Reason = case Reason0 of
+ {shutdown, ShutdownReason} -> ShutdownReason;
+ Other -> Other
+ end,
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
% optimize some options to help the job make progress.
-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+ [{{crashed, max_backoff}, _} | _]}) ->
Opts = [
{checkpoint_interval, 5000},
{worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb..54a69a2 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
workers,
stats = couch_replicator_stats:new(),
session_id,
- source_monitor = nil,
- target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
handle_info(shutdown, St) ->
{stop, shutdown, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -264,7 +254,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_reader_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
@@ -272,7 +262,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
@@ -280,7 +270,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
@@ -305,7 +295,11 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
+ StopReason = case Reason of
+ {shutdown, _} = Err -> Err;
+ Other -> {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
end;
handle_info(timeout, InitArgs) ->
@@ -592,8 +586,6 @@ init_state(Rep) ->
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +897,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
end.
-db_monitor(#httpdb{}) ->
- nil;
-db_monitor(Db) ->
- couch_db:monitor(Db).
-
-
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f58..26a0dcf 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,9 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
@@ -386,7 +389,13 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
couch_replicator_stats:new([
{docs_written, length(DocList) - length(Errors)},
{doc_write_failures, length(Errors)}
- ]).
+ ]);
+handle_flush_docs_result({error, {bulk_docs_failed, ErrCode, ErrMsg}}, Target,
+ _) ->
+ TgtUri = couch_replicator_api_wrap:db_uri(Target),
+ LogMsg = "Replicator: bulk docs to `~s` failed with ~p ~p",
+ couch_log:error(LogMsg, [TgtUri, ErrCode, ErrMsg]),
+ exit({bulk_docs_failed, ErrCode, ErrMsg}).
flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->