You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/13 17:43:08 UTC

[couchdb] branch improve-error-reporting-in-replicator updated (f907da5 -> d036552)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard f907da5  WIP cleanup error reporting in replicator
     add 24201a3  Remove debug logging from test/javascript/run
     add dd1b281  When shard splitting make sure to reset the targets before any retries
     add 8d92f21  Reset a view shard if the signature is wrong
     add c8a75fa  Debug mem3 eunit error
     add e10498f  Merge pull request #2438 from apache/reset-corrupt-view-index
     new d036552  Improve replicator error reporting

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f907da5)
            \
             N -- N -- N   refs/heads/improve-error-reporting-in-replicator (d036552)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_mrview/src/couch_mrview_index.erl        |   6 +
 .../src/couch_replicator_api_wrap.erl              |   8 +-
 .../src/couch_replicator_scheduler_job.erl         |  27 +-
 .../src/couch_replicator_worker.erl                |  20 +-
 .../couch_replicator_error_reporting_tests.erl     | 271 +++++++++++++++++++++
 src/mem3/src/mem3_reshard_job.erl                  |  14 +-
 src/mem3/src/mem3_sync_event_listener.erl          |   2 +-
 src/mem3/test/eunit/mem3_reshard_test.erl          |   6 +-
 test/javascript/run                                |   1 -
 9 files changed, 324 insertions(+), 31 deletions(-)
 create mode 100644 src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl


[couchdb] 01/01: Improve replicator error reporting

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-error-reporting-in-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d036552980f0448fe7e6ea9f700dc6b4ef31857f
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Jan 13 12:29:49 2020 -0500

    Improve replicator error reporting
    
    Previously many HTTP requests failed noisily with `function_clause` errors.
    Expect some of those failures and handle them better. There are mainly 3 types
    of improvements:
    
     1) Error messages are shorter. Instead of `function_clause` with a cryptic
     internal fun names, return a simple marker like `bulk_docs_failed`
    
     2) Include the error body if it was returned. HTTP failures besides the error
     code may contain useful nformation in the body to help debug the failure.
    
     3) Do not log or include the stack trace in the message. The error names are
     enough to identify the place were they are generated so avoid spamming the
     user and the logs with them. This is done by using `{shutdown, Error}` tuples
     to bubble up the error the replication scheduler.
    
    Fixes: https://github.com/apache/couchdb/issues/2413
---
 .../src/couch_replicator_api_wrap.erl              |  12 +-
 .../src/couch_replicator_scheduler.erl             |   8 +-
 .../src/couch_replicator_scheduler_job.erl         |  51 ++--
 .../src/couch_replicator_worker.erl                |  19 +-
 .../couch_replicator_error_reporting_tests.erl     | 271 +++++++++++++++++++++
 5 files changed, 330 insertions(+), 31 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7d..a21de42 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
                 ),
                 {Id, MissingRevs, PossibleAncestors}
             end,
-            {ok, lists:map(ConvertToNativeFun, Props)}
+            {ok, lists:map(ConvertToNativeFun, Props)};
+        (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+            {error, {revs_diff_failed, ErrCode, ErrMsg}}
         end).
 
 
@@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
            (413, _, _) ->
                 {error, request_body_too_large};
            (417, _, Results) when is_list(Results) ->
-                {ok, bulk_results_to_errors(DocList, Results, remote)}
+                {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+                {error, {bulk_docs_failed, ErrCode, ErrMsg}}
         end).
 
 
@@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
                             end,
                             parse_changes_feed(Options, UserFun2,
                                 DataStreamFun2)
-                        end)
+                        end);
+                 (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+                    throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}})
             end)
     catch
         exit:{http_request_failed, _, _, max_backoff} ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c..53c040e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
 
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
     {ok, Job} = job_by_pid(Pid),
+    Reason = case Reason0 of
+        {shutdown, ShutdownReason} -> ShutdownReason;
+        Other -> Other
+    end,
     ok = handle_crashed_job(Job, Reason, State),
     {noreply, State};
 
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
 % optimize some options to help the job make progress.
 -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
 maybe_optimize_job_for_rate_limiting(Job = #job{history =
-    [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+    [{{crashed, max_backoff}, _} | _]}) ->
     Opts = [
         {checkpoint_interval, 5000},
         {worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb..a0ee29f 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
     workers,
     stats = couch_replicator_stats:new(),
     session_id,
-    source_monitor = nil,
-    target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
     checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
 handle_info(shutdown, St) ->
     {stop, shutdown, St};
 
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
 handle_info({'EXIT', Pid, max_backoff}, State) ->
     couch_log:error("Max backoff reached child process ~p", [Pid]),
     {stop, {shutdown, max_backoff}, State};
@@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
     {noreply, State};
 
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+    Reason = case Reason0 of
+        {changes_req_failed, _, _} = HttpFail ->
+            HttpFail;
+        {http_request_failed, _, _, {error, {code, Code}}} ->
+            {changes_req_failed, Code};
+        {http_request_failed, _, _, {error, Err}} ->
+            {changes__req_failed, Err};
+        Other ->
+            {changes_reader_died, Other}
+    end,
     couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
+    {stop, {shutdown, Reason}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
     {noreply, State};
@@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
     couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
     {noreply, State};
@@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
 handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
     couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
     couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
+    {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
 
 handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
     case Workers -- [Pid] of
@@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
         {stop, {unknown_process_died, Pid, Reason}, State2};
     true ->
         couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
+        StopReason = case Reason of
+            {shutdown, _} = Err ->
+                Err;
+            Other ->
+                couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+                {worker_died, Pid, Other}
+         end,
+        {stop, StopReason, State2}
     end;
 
 handle_info(timeout, InitArgs) ->
@@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) ->
     terminate_cleanup(State),
     couch_replicator_notifier:notify({error, RepId, max_backoff});
 
+terminate({shutdown, Reason}, State) ->
+    % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
+    % wrapped around the message
+    terminate(Reason, State);
+
 terminate(Reason, State) ->
 #rep_state{
         source_name = Source,
@@ -592,8 +603,6 @@ init_state(Rep) ->
         src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
         tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
         session_id = couch_uuids:random(),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
         source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options, true),
         checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +914,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
     end.
 
 
-db_monitor(#httpdb{}) ->
-	nil;
-db_monitor(Db) ->
-	couch_db:monitor(Db).
-
-
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,
     Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f58..d708675 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
 handle_info({'EXIT', _Pid, max_backoff}, State) ->
     {stop, {shutdown, max_backoff}, State};
 
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
+
 handle_info({'EXIT', Pid, Reason}, State) ->
    {stop, {process_died, Pid, Reason}, State}.
 
@@ -386,7 +395,10 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
     couch_replicator_stats:new([
         {docs_written, length(DocList) - length(Errors)},
         {doc_write_failures, length(Errors)}
-    ]).
+    ]);
+handle_flush_docs_result({error, {bulk_docs_failed, ErrCode, ErrMsg} = Err},
+        Target, _) ->
+    exit(Err).
 
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
@@ -425,7 +437,10 @@ find_missing(DocInfos, Target) ->
             end, {[], 0}, DocInfos),
 
 
-    {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
+    Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of
+        {ok, Result} -> Result;
+        {error, Error} -> exit(Error)
+    end,
     MissingRevsCount = lists:foldl(
         fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
         0, Missing),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
new file mode 100644
index 0000000..6b4f95c
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_error_reporting_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup_all() ->
+    test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+    ok = test_util:stop_couch(Ctx).
+
+
+setup() ->
+    meck:unload(),
+    Source = setup_db(),
+    Target = setup_db(),
+    {Source, Target}.
+
+
+teardown({Source, Target}) ->
+    meck:unload(),
+    teardown_db(Source),
+    teardown_db(Target),
+    ok.
+
+
+error_reporting_test_() ->
+    {
+        setup,
+        fun setup_all/0,
+        fun teardown_all/1,
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun t_fail_bulk_docs/1,
+                fun t_fail_changes_reader/1,
+                fun t_fail_revs_diff/1,
+                fun t_fail_changes_queue/1,
+                fun t_fail_changes_manager/1,
+                fun t_fail_changes_reader_proc/1
+            ]
+        }
+    }.
+
+
+t_fail_bulk_docs({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+        populate_db(Source, 6, 6),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
+
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_reader({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+        populate_db(Source, 6, 6),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
+
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_revs_diff({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+        populate_db(Source, 6, 6),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
+
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_queue({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        RepPid = couch_replicator_test_helper:get_pid(RepId),
+        State = sys:get_state(RepPid),
+        ChangesQueue = element(20, State),
+        ?assert(is_process_alive(ChangesQueue)),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        exit(ChangesQueue, boom),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_queue_died, boom}, Result),
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_manager({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        RepPid = couch_replicator_test_helper:get_pid(RepId),
+        State = sys:get_state(RepPid),
+        ChangesManager = element(21, State),
+        ?assert(is_process_alive(ChangesManager)),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        exit(ChangesManager, bam),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_manager_died, bam}, Result),
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+t_fail_changes_reader_proc({Source, Target}) ->
+    ?_test(begin
+        populate_db(Source, 1, 5),
+        {ok, RepId} = replicate(Source, Target),
+        wait_target_in_sync(Source, Target),
+
+        RepPid = couch_replicator_test_helper:get_pid(RepId),
+        State = sys:get_state(RepPid),
+        ChangesReader = element(22, State),
+        ?assert(is_process_alive(ChangesReader)),
+
+        {ok, Listener} = rep_result_listener(RepId),
+        exit(ChangesReader, kapow),
+
+        {error, Result} = wait_rep_result(RepId),
+        ?assertEqual({changes_reader_died, kapow}, Result),
+        couch_replicator_notifier:stop(Listener)
+    end).
+
+
+mock_fail_req(Path, Return) ->
+    meck:expect(ibrowse, send_req_direct,
+        fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
+            Args = [W, Url, Headers, Meth, Body, Opts, TOut],
+            {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
+            case lists:suffix(Path, UPath) of
+                true -> Return;
+                false -> meck:passthrough(Args)
+            end
+        end).
+
+
+rep_result_listener(RepId) ->
+    ReplyTo = self(),
+    {ok, _Listener} = couch_replicator_notifier:start_link(
+        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+                ReplyTo ! Ev;
+            (_) ->
+                ok
+        end).
+
+
+wait_rep_result(RepId) ->
+    receive
+        {finished, RepId, RepResult} -> {ok, RepResult};
+        {error, RepId, Reason} -> {error, Reason}
+    end.
+
+
+
+setup_db() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    DbName.
+
+
+teardown_db(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+populate_db(DbName, Start, End) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    Docs = lists:foldl(
+        fun(DocIdCounter, Acc) ->
+            Id = integer_to_binary(DocIdCounter),
+            Doc = #doc{id = Id, body = {[]}},
+            [Doc | Acc]
+        end,
+        [], lists:seq(Start, End)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []),
+    ok = couch_db:close(Db).
+
+
+wait_target_in_sync(Source, Target) ->
+    {ok, SourceDb} = couch_db:open_int(Source, []),
+    {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+    ok = couch_db:close(SourceDb),
+    SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+    wait_target_in_sync_loop(SourceDocCount, Target, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+    erlang:error({assertion_failed, [
+          {module, ?MODULE}, {line, ?LINE},
+          {reason, "Could not get source and target databases in sync"}
+    ]});
+
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+    {ok, Target} = couch_db:open_int(TargetName, []),
+    {ok, TargetInfo} = couch_db:get_db_info(Target),
+    ok = couch_db:close(Target),
+    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+    case TargetDocCount == DocCount of
+        true ->
+            true;
+        false ->
+            ok = timer:sleep(500),
+            wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+    end.
+
+
+replicate(Source, Target) ->
+    SrcUrl = couch_replicator_test_helper:db_url(Source),
+    TgtUrl = couch_replicator_test_helper:db_url(Target),
+    RepObject = {[
+        {<<"source">>, SrcUrl},
+        {<<"target">>, TgtUrl},
+        {<<"continuous">>, true},
+        {<<"worker_processes">>, 1},
+        {<<"retries_per_request">>, 1},
+        % Low connection timeout so _changes feed gets restarted quicker
+        {<<"connection_timeout">>, 3000}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+    ok = couch_replicator_scheduler:add_job(Rep),
+    couch_replicator_scheduler:reschedule(),
+    {ok, Rep#rep.id}.