You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/13 17:43:09 UTC

[couchdb] 01/01: Improve replicator error reporting

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d036552980f0448fe7e6ea9f700dc6b4ef31857f
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Jan 13 12:29:49 2020 -0500

    Improve replicator error reporting
    
    Previously many HTTP requests failed noisily with `function_clause` errors.
    Expect some of those failures and handle them better. There are mainly 3 types
    of improvements:
    
     1) Error messages are shorter. Instead of `function_clause` with a cryptic
     internal fun names, return a simple marker like `bulk_docs_failed`
    
     2) Include the error body if it was returned. HTTP failures besides the error
     code may contain useful nformation in the body to help debug the failure.
    
     3) Do not log or include the stack trace in the message. The error names are
     enough to identify the place were they are generated so avoid spamming the
     user and the logs with them. This is done by using `{shutdown, Error}` tuples
     to bubble up the error the replication scheduler.
    
    Fixes: https://github.com/apache/couchdb/issues/2413
---
 .../src/couch_replicator_api_wrap.erl              |  12 +-
 .../src/couch_replicator_scheduler.erl             |   8 +-
 .../src/couch_replicator_scheduler_job.erl         |  51 ++--
 .../src/couch_replicator_worker.erl                |  19 +-
 .../couch_replicator_error_reporting_tests.erl     | 271 +++++++++++++++++++++
 5 files changed, 330 insertions(+), 31 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7d..a21de42 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
                 ),
                 {Id, MissingRevs, PossibleAncestors}
             end,
-            {ok, lists:map(ConvertToNativeFun, Props)}
+            {ok, lists:map(ConvertToNativeFun, Props)};
+        (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+            {error, {revs_diff_failed, ErrCode, ErrMsg}}
         end).
 
 
@@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
            (413, _, _) ->
                 {error, request_body_too_large};
            (417, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)}
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+                {error, {bulk_docs_failed, ErrCode, ErrMsg}}
         end).
 
 
@@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
                             end,
                             parse_changes_feed(Options, UserFun2,
                                 DataStreamFun2)
-                        end)
+                        end);
+                 (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+                    throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}})
             end)
     catch
         exit:{http_request_failed, _, _, max_backoff} ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c..53c040e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
 
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
     {ok, Job} = job_by_pid(Pid),
+    Reason = case Reason0 of
+        {shutdown, ShutdownReason} -> ShutdownReason;
+        Other -> Other
+    end,
     ok = handle_crashed_job(Job, Reason, State),
     {noreply, State};
 
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
 % optimize some options to help the job make progress.
 -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
 maybe_optimize_job_for_rate_limiting(Job = #job{history =
-    [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+    [{{crashed, max_backoff}, _} | _]}) ->
     Opts = [
         {checkpoint_interval, 5000},
         {worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb..a0ee29f 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
     workers,
     stats = couch_replicator_stats:new(),
     session_id,
-    source_monitor = nil,
-    target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
 
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
 handle_info({'EXIT', Pid, max_backoff}, State) ->
     couch_log:error("Max backoff reached child process ~p", [Pid]),
     {stop, {shutdown, max_backoff}, State};
@@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
     {noreply, State};
 
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+    Reason = case Reason0 of
+        {changes_req_failed, _, _} = HttpFail ->
+            HttpFail;
+        {http_request_failed, _, _, {error, {code, Code}}} ->
+            {changes_req_failed, Code};
+        {http_request_failed, _, _, {error, Err}} ->
+            {changes__req_failed, Err};
+        Other ->
+            {changes_reader_died, Other}
+    end,
     couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
+    {stop, {shutdown, Reason}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
     {noreply, State};
@@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
     couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
@@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
     couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
     case Workers -- [Pid] of
@@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
         {stop, {unknown_process_died, Pid, Reason}, State2};
     true ->
         couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
+        StopReason = case Reason of
+            {shutdown, _} = Err ->
+                Err;
+            Other ->
+                couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+                {worker_died, Pid, Other}
+         end,
+        {stop, StopReason, State2}
     end;
 
 handle_info(timeout, InitArgs) ->
@@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) ->
     terminate_cleanup(State),
     couch_replicator_notifier:notify({error, RepId, max_backoff});
 
+terminate({shutdown, Reason}, State) ->
+    % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
+    % wrapped around the message
+    terminate(Reason, State);
+
 terminate(Reason, State) ->
 #rep_state{
         source_name = Source,
@@ -592,8 +603,6 @@ init_state(Rep) ->
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
         source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options, true),
         checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +914,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
     end.
 
 
-db_monitor(#httpdb{}) ->
-	nil;
-db_monitor(Db) ->
-	couch_db:monitor(Db).
-
-
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,
     Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f58..d708675 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
 handle_info({'EXIT', _Pid, max_backoff}, State) ->
     {stop, {shutdown, max_backoff}, State};
 
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
    {stop, {process_died, Pid, Reason}, State}.
 
@@ -386,7 +395,10 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
     couch_replicator_stats:new([
         {docs_written, length(DocList) - length(Errors)},
         {doc_write_failures, length(Errors)}
-    ]).
+    ]);
+handle_flush_docs_result({error, {bulk_docs_failed, ErrCode, ErrMsg} = Err},
+        Target, _) ->
+    exit(Err).
 
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
@@ -425,7 +437,10 @@ find_missing(DocInfos, Target) ->
             end, {[], 0}, DocInfos),
 
 
-    {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
+    Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of
+        {ok, Result} -> Result;
+        {error, Error} -> exit(Error)
+    end,
     MissingRevsCount = lists:foldl(
         fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
         0, Missing),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
new file mode 100644
index 0000000..6b4f95c
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_error_reporting_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup_all() ->
+    test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+    ok = test_util:stop_couch(Ctx).
+
+
+setup() ->
+    meck:unload(),
+    Source = setup_db(),
+    Target = setup_db(),
+    {Source, Target}.
+
+
+teardown({Source, Target}) ->
+    meck:unload(),
+    teardown_db(Source),
+    teardown_db(Target),
+    ok.
+
+
+error_reporting_test_() ->
+    {
+        setup,
+        fun setup_all/0,
+        fun teardown_all/1,
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun t_fail_bulk_docs/1,
+                fun t_fail_changes_reader/1,
+                fun t_fail_revs_diff/1,
+                fun t_fail_changes_queue/1,
+                fun t_fail_changes_manager/1,
+                fun t_fail_changes_reader_proc/1
+            ]
+        }
+    }.
+
+
+t_fail_bulk_docs({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+        populate_db(Source, 6, 6),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
+
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_reader({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+        populate_db(Source, 6, 6),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
+
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_revs_diff({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+        populate_db(Source, 6, 6),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
+
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_queue({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        RepPid = couch_replicator_test_helper:get_pid(RepId),
+        State = sys:get_state(RepPid),
+        ChangesQueue = element(20, State),
+        ?assert(is_process_alive(ChangesQueue)),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        exit(ChangesQueue, boom),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_queue_died, boom}, Result),
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_manager({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        RepPid = couch_replicator_test_helper:get_pid(RepId),
+        State = sys:get_state(RepPid),
+        ChangesManager = element(21, State),
+        ?assert(is_process_alive(ChangesManager)),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        exit(ChangesManager, bam),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_manager_died, bam}, Result),
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_reader_proc({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        RepPid = couch_replicator_test_helper:get_pid(RepId),
+        State = sys:get_state(RepPid),
+        ChangesReader = element(22, State),
+        ?assert(is_process_alive(ChangesReader)),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        exit(ChangesReader, kapow),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_reader_died, kapow}, Result),
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+mock_fail_req(Path, Return) ->
+    meck:expect(ibrowse, send_req_direct,
+        fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
+            Args = [W, Url, Headers, Meth, Body, Opts, TOut],
+            {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
+            case lists:suffix(Path, UPath) of
+                true -> Return;
+                false -> meck:passthrough(Args)
+            end
+        end).
+
+
+rep_result_listener(RepId) ->
+    ReplyTo = self(),
+    {ok, _Listener} = couch_replicator_notifier:start_link(
+        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+                ReplyTo ! Ev;
+            (_) ->
+                ok
+        end).
+
+
+wait_rep_result(RepId) ->
+    receive
+        {finished, RepId, RepResult} -> {ok, RepResult};
+        {error, RepId, Reason} -> {error, Reason}
+    end.
+
+
+
+setup_db() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    DbName.
+
+
+teardown_db(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+populate_db(DbName, Start, End) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Docs = lists:foldl(
+        fun(DocIdCounter, Acc) ->
+            Id = integer_to_binary(DocIdCounter),
+            Doc = #doc{id = Id, body = {[]}},
+            [Doc | Acc]
+        end,
+        [], lists:seq(Start, End)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    ok = couch_db:close(Db).
+
+
+wait_target_in_sync(Source, Target) ->
+    {ok, SourceDb} = couch_db:open_int(Source, []),
+    {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+    ok = couch_db:close(SourceDb),
+    SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+    wait_target_in_sync_loop(SourceDocCount, Target, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+    erlang:error({assertion_failed, [
+          {module, ?MODULE}, {line, ?LINE},
+          {reason, "Could not get source and target databases in sync"}
+    ]});
+
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+    {ok, Target} = couch_db:open_int(TargetName, []),
+    {ok, TargetInfo} = couch_db:get_db_info(Target),
+    ok = couch_db:close(Target),
+    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+    case TargetDocCount == DocCount of
+        true ->
+            true;
+        false ->
+            ok = timer:sleep(500),
+            wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+    end.
+
+
+replicate(Source, Target) ->
+    SrcUrl = couch_replicator_test_helper:db_url(Source),
+    TgtUrl = couch_replicator_test_helper:db_url(Target),
+    RepObject = {[
+        {<<"source">>, SrcUrl},
+        {<<"target">>, TgtUrl},
+        {<<"continuous">>, true},
+        {<<"worker_processes">>, 1},
+        {<<"retries_per_request">>, 1},
+        % Low connection timeout so _changes feed gets restarted quicker
+        {<<"connection_timeout">>, 3000}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    couch_replicator_scheduler:reschedule(),
+    {ok, Rep#rep.id}.