You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2011/12/05 11:31:10 UTC

[1/2] git commit: add load test for buildbot.

Updated Branches:
  refs/heads/master f913ca6e8 -> 1c0c183df


add load test for buildbot.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/604f1a60
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/604f1a60
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/604f1a60

Branch: refs/heads/master
Commit: 604f1a60724737aee3a942ebe9e808c53d566276
Parents: f913ca6
Author: benoitc <be...@apache.org>
Authored: Mon Dec 5 11:03:12 2011 +0100
Committer: benoitc <be...@apache.org>
Committed: Mon Dec 5 11:16:29 2011 +0100

----------------------------------------------------------------------
 src/couch_replicator/Makefile.am                   |    9 +-
 src/couch_replicator/test/001-httpc-pool.t         |  250 --------
 .../test/002-replication-compact.t                 |  486 ---------------
 .../test/003-replication-large-atts.t              |  267 --------
 .../test/004-replication-many-leaves.t             |  216 -------
 src/couch_replicator/test/01-load.t                |   37 ++
 src/couch_replicator/test/02-httpc-pool.t          |  250 ++++++++
 src/couch_replicator/test/03-replication-compact.t |  486 +++++++++++++++
 .../test/04-replication-large-atts.t               |  267 ++++++++
 .../test/05-replication-many-leaves.t              |  216 +++++++
 10 files changed, 1261 insertions(+), 1223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/Makefile.am
----------------------------------------------------------------------
diff --git a/src/couch_replicator/Makefile.am b/src/couch_replicator/Makefile.am
index 169a7e3..37b3e3b 100644
--- a/src/couch_replicator/Makefile.am
+++ b/src/couch_replicator/Makefile.am
@@ -36,10 +36,11 @@ source_files = \
 	src/couch_replicator.erl
 
 test_files = \
-	test/001-httpc-pool.t \
-	test/002-replication-compact.t \
-	test/003-replication-large-atts.t \
-	test/004-replication-many-leaves.t
+	test/01-load.t \
+	test/02-httpc-pool.t \
+	test/03-replication-compact.t \
+	test/04-replication-large-atts.t \
+	test/05-replication-many-leaves.t
 
 compiled_files = \
 	ebin/couch_replicator_api_wrap.beam \

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/001-httpc-pool.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/001-httpc-pool.t b/src/couch_replicator/test/001-httpc-pool.t
deleted file mode 100755
index a7bde6c..0000000
--- a/src/couch_replicator/test/001-httpc-pool.t
+++ /dev/null
@@ -1,250 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-main(_) ->
-    test_util:init_code_path(),
-
-    etap:plan(55),
-    case (catch test()) of
-        ok ->
-            etap:end_tests();
-        Other ->
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
-            etap:bail(Other)
-    end,
-    ok.
-
-
-test() ->
-    couch_server_sup:start_link(test_util:config_files()),
-    ibrowse:start(),
-
-    test_pool_full(),
-    test_worker_dead_pool_non_full(),
-    test_worker_dead_pool_full(),
-
-    couch_server_sup:stop(),
-    ok.
-
-
-test_pool_full() ->
-    Pool = spawn_pool(),
-    Client1 = spawn_client(Pool),
-    Client2 = spawn_client(Pool),
-    Client3 = spawn_client(Pool),
-
-    etap:diag("Check that we can spawn the max number of connections."),
-    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
-    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
-    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
-
-    Worker1 = get_client_worker(Client1, "1"),
-    Worker2 = get_client_worker(Client2, "2"),
-    Worker3 = get_client_worker(Client3, "3"),
-    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
-
-    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
-    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
-    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
-
-    etap:diag("Check that client 4 blocks waiting for a worker."),
-    Client4 = spawn_client(Pool),
-    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
-
-    etap:diag("Check that stopping a client gives up its worker."),
-    etap:is(stop_client(Client1), ok, "First client stopped."),
-
-    etap:diag("And check that our blocked client has been unblocked."),
-    etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
-
-    Worker4 = get_client_worker(Client4, "4"),
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
-    etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
-
-    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
-    stop_pool(Pool).
-
-
-test_worker_dead_pool_non_full() ->
-    Pool = spawn_pool(),
-    Client1 = spawn_client(Pool),
-
-    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
-    Worker1 = get_client_worker(Client1, "1"),
-    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
-
-    etap:diag("Kill client's 1 worker."),
-    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
-    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
-
-    etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
-
-    Client2 = spawn_client(Pool),
-    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
-    Worker2 = get_client_worker(Client2, "2"),
-    etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
-
-    etap:is(stop_client(Client2), ok, "Second client stopped."),
-    stop_pool(Pool).
-
-
-test_worker_dead_pool_full() ->
-    Pool = spawn_pool(),
-    Client1 = spawn_client(Pool),
-    Client2 = spawn_client(Pool),
-    Client3 = spawn_client(Pool),
-
-    etap:diag("Check that we can spawn the max number of connections."),
-    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
-    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
-    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
-
-    Worker1 = get_client_worker(Client1, "1"),
-    Worker2 = get_client_worker(Client2, "2"),
-    Worker3 = get_client_worker(Client3, "3"),
-    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
-
-    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
-    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
-    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
-
-    etap:diag("Check that client 4 blocks waiting for a worker."),
-    Client4 = spawn_client(Pool),
-    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
-
-    etap:diag("Kill client's 1 worker."),
-    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
-    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
-
-    etap:diag("Check client 4 got unblocked after first worker's death"),
-    etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
-
-    Worker4 = get_client_worker(Client4, "4"),
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
-    etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
-    etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
-    etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
-
-    etap:diag("Check that stopping client 1 is a noop."),
-    etap:is(stop_client(Client1), ok, "First client stopped."),
-
-    etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
-
-    etap:diag("Check that client 5 blocks waiting for a worker."),
-    Client5 = spawn_client(Pool),
-    etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
-
-    etap:diag("Check that stopping client 2 gives up its worker."),
-    etap:is(stop_client(Client2), ok, "Second client stopped."),
-
-    etap:diag("Now check that client 5 has been unblocked."),
-    etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
-
-    Worker5 = get_client_worker(Client5, "5"),
-    etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
-    etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
-    etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
-    etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
-    etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
-
-    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
-    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
-    etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
-
-    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
-    stop_pool(Pool).
-
-
-spawn_client(Pool) ->
-    Parent = self(),
-    Ref = make_ref(),
-    Pid = spawn(fun() ->
-        {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool),
-        loop(Parent, Ref, Worker, Pool)
-    end),
-    {Pid, Ref}.
-
-
-ping_client({Pid, Ref}) ->
-    Pid ! ping,
-    receive
-        {pong, Ref} ->
-            ok
-    after 3000 ->
-        timeout
-    end.
-
-
-get_client_worker({Pid, Ref}, ClientName) ->
-    Pid ! get_worker,
-    receive
-        {worker, Ref, Worker} ->
-            Worker
-    after 3000 ->
-        etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
-    end.
-
-
-stop_client({Pid, Ref}) ->
-    Pid ! stop,
-    receive
-        {stop, Ref} ->
-            ok
-    after 3000 ->
-        timeout
-    end.
-
-
-kill_client_worker({Pid, Ref}) ->
-    Pid ! get_worker,
-    receive
-        {worker, Ref, Worker} ->
-            exit(Worker, kill),
-            ok
-    after 3000 ->
-        timeout
-    end.
-
-
-loop(Parent, Ref, Worker, Pool) ->
-    receive
-        ping ->
-            Parent ! {pong, Ref},
-            loop(Parent, Ref, Worker, Pool);
-        get_worker  ->
-            Parent ! {worker, Ref, Worker},
-            loop(Parent, Ref, Worker, Pool);
-        stop ->
-            couch_replicator_httpc_pool:release_worker(Pool, Worker),
-            Parent ! {stop, Ref}
-    end.
-
-
-spawn_pool() ->
-    Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
-    Port = couch_config:get("httpd", "port", "5984"),
-    {ok, Pool} = couch_replicator_httpc_pool:start_link(
-        "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
-    Pool.
-
-
-stop_pool(Pool) ->
-    ok = couch_replicator_httpc_pool:stop(Pool).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/002-replication-compact.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/002-replication-compact.t b/src/couch_replicator/test/002-replication-compact.t
deleted file mode 100755
index c8b265e..0000000
--- a/src/couch_replicator/test/002-replication-compact.t
+++ /dev/null
@@ -1,486 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Verify that compacting databases that are being used as the source or
-% target of a replication doesn't affect the replication and that the
-% replication doesn't hold their reference counters forever.
-
--define(b2l(B), binary_to_list(B)).
-
--record(user_ctx, {
-    name = null,
-    roles = [],
-    handler
-}).
-
--record(db, {
-    main_pid = nil,
-    update_pid = nil,
-    compactor_pid = nil,
-    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
-    fd,
-    updater_fd,
-    fd_ref_counter,
-    header = nil,
-    committed_update_seq,
-    fulldocinfo_by_id_btree,
-    docinfo_by_seq_btree,
-    local_docs_btree,
-    update_seq,
-    name,
-    filepath,
-    validate_doc_funs = [],
-    security = [],
-    security_ptr = nil,
-    user_ctx = #user_ctx{},
-    waiting_delayed_commit = nil,
-    revs_limit = 1000,
-    fsync_options = [],
-    options = [],
-    compression
-}).
-
--record(rep, {
-    id,
-    source,
-    target,
-    options,
-    user_ctx,
-    doc_id
-}).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-
-main(_) ->
-    test_util:init_code_path(),
-
-    etap:plan(376),
-    case (catch test()) of
-        ok ->
-            etap:end_tests();
-        Other ->
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
-            etap:bail(Other)
-    end,
-    ok.
-
-
-test() ->
-    couch_server_sup:start_link(test_util:config_files()),
-    ibrowse:start(),
-
-    Pairs = [
-        {source_db_name(), target_db_name()},
-        {{remote, source_db_name()}, target_db_name()},
-        {source_db_name(), {remote, target_db_name()}},
-        {{remote, source_db_name()}, {remote, (target_db_name())}}
-    ],
-
-    lists:foreach(
-        fun({Source, Target}) ->
-            {ok, SourceDb} = create_db(source_db_name()),
-            etap:is(couch_db:is_idle(SourceDb), true,
-                "Source database is idle before starting replication"),
-
-            {ok, TargetDb} = create_db(target_db_name()),
-            etap:is(couch_db:is_idle(TargetDb), true,
-                "Target database is idle before starting replication"),
-
-            {ok, RepPid, RepId} = replicate(Source, Target),
-            check_active_tasks(RepPid, RepId, Source, Target),
-            {ok, DocsWritten} = populate_and_compact_test(
-                RepPid, SourceDb, TargetDb),
-
-            wait_target_in_sync(DocsWritten, TargetDb),
-            check_active_tasks(RepPid, RepId, Source, Target),
-            cancel_replication(RepId, RepPid),
-            compare_dbs(SourceDb, TargetDb),
-
-            delete_db(SourceDb),
-            delete_db(TargetDb),
-            couch_server_sup:stop(),
-            ok = timer:sleep(1000),
-            couch_server_sup:start_link(test_util:config_files())
-        end,
-        Pairs),
-
-    couch_server_sup:stop(),
-    ok.
-
-
-populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
-    etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
-    check_db_alive("source", SourceDb0),
-    check_db_alive("target", TargetDb0),
-
-    Writer = spawn_writer(SourceDb0),
-
-    lists:foldl(
-        fun(_, {SourceDb, TargetDb, DocCount}) ->
-            pause_writer(Writer),
-
-            compact_db("source", SourceDb),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after source database compaction"),
-            check_db_alive("source", SourceDb),
-            check_ref_counter("source", SourceDb),
-
-            compact_db("target", TargetDb),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after target database compaction"),
-            check_db_alive("target", TargetDb),
-            check_ref_counter("target", TargetDb),
-
-            {ok, SourceDb2} = reopen_db(SourceDb),
-            {ok, TargetDb2} = reopen_db(TargetDb),
-
-            resume_writer(Writer),
-            wait_writer(Writer, DocCount),
-
-            compact_db("source", SourceDb2),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after source database compaction"),
-            check_db_alive("source", SourceDb2),
-            pause_writer(Writer),
-            check_ref_counter("source", SourceDb2),
-            resume_writer(Writer),
-
-            compact_db("target", TargetDb2),
-            etap:is(is_process_alive(RepPid), true,
-                "Replication process is alive after target database compaction"),
-            check_db_alive("target", TargetDb2),
-            pause_writer(Writer),
-            check_ref_counter("target", TargetDb2),
-            resume_writer(Writer),
-
-            {ok, SourceDb3} = reopen_db(SourceDb2),
-            {ok, TargetDb3} = reopen_db(TargetDb2),
-            {SourceDb3, TargetDb3, DocCount + 50}
-        end,
-        {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
-
-    DocsWritten = stop_writer(Writer),
-    {ok, DocsWritten}.
-
-
-check_db_alive(Type, #db{main_pid = Pid}) ->
-    etap:is(is_process_alive(Pid), true,
-        "Local " ++ Type ++ " database main pid is alive").
-
-
-compact_db(Type, #db{name = Name}) ->
-    {ok, Db} = couch_db:open_int(Name, []),
-    {ok, CompactPid} = couch_db:start_compact(Db),
-    MonRef = erlang:monitor(process, CompactPid),
-    receive
-    {'DOWN', MonRef, process, CompactPid, normal} ->
-        ok;
-    {'DOWN', MonRef, process, CompactPid, Reason} ->
-        etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
-            ": " ++ couch_util:to_list(Reason))
-    after 30000 ->
-        etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
-            " didn't finish")
-    end,
-    ok = couch_db:close(Db).
-
-
-check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
-    MonRef = erlang:monitor(process, OldRefCounter),
-    receive
-    {'DOWN', MonRef, process, OldRefCounter, _} ->
-        etap:diag("Old " ++ Type ++ " database ref counter terminated")
-    after 30000 ->
-        etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
-    end,
-    {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
-    ok = couch_db:close(Db),
-    etap:isnt(
-        NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
-
-
-reopen_db(#db{name = Name}) ->
-    {ok, Db} = couch_db:open_int(Name, []),
-    ok = couch_db:close(Db),
-    {ok, Db}.
-
-
-wait_target_in_sync(DocCount, #db{name = TargetName}) ->
-    wait_target_in_sync_loop(DocCount, TargetName, 300).
-
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
-    etap:bail("Could not get source and target databases in sync");
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
-    {ok, Target} = couch_db:open_int(TargetName, []),
-    {ok, TargetInfo} = couch_db:get_db_info(Target),
-    ok = couch_db:close(Target),
-    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
-    case TargetDocCount == DocCount of
-    true ->
-        etap:diag("Source and target databases are in sync");
-    false ->
-        ok = timer:sleep(100),
-        wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
-    end.
-
-
-compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
-    {ok, SourceDb} = couch_db:open_int(SourceName, []),
-    {ok, TargetDb} = couch_db:open_int(TargetName, []),
-    Fun = fun(FullDocInfo, _, Acc) ->
-        {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
-        {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
-        DocId = couch_util:get_value(<<"_id">>, Props),
-        DocTarget = case couch_db:open_doc(TargetDb, DocId) of
-        {ok, DocT} ->
-            DocT;
-        Error ->
-            etap:bail("Error opening document '" ++ ?b2l(DocId) ++
-                "' from target: " ++ couch_util:to_list(Error))
-        end,
-        DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
-        case DocTargetJson of
-        DocJson ->
-            ok;
-        _ ->
-            etap:bail("Content from document '" ++ ?b2l(DocId) ++
-                "' differs in target database")
-        end,
-        {ok, Acc}
-    end,
-    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
-    etap:diag("Target database has the same documents as the source database"),
-    ok = couch_db:close(SourceDb),
-    ok = couch_db:close(TargetDb).
-
-
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
-    Source = case Src of
-    {remote, NameSrc} ->
-        <<(db_url(NameSrc))/binary, $/>>;
-    _ ->
-        Src
-    end,
-    Target = case Tgt of
-    {remote, NameTgt} ->
-        <<(db_url(NameTgt))/binary, $/>>;
-    _ ->
-        Tgt
-    end,
-    FullRepId = list_to_binary(BaseId ++ Ext),
-    Pid = list_to_binary(pid_to_list(RepPid)),
-    [RepTask] = couch_task_status:all(),
-    etap:is(couch_util:get_value(pid, RepTask), Pid,
-        "_active_tasks entry has correct pid property"),
-    etap:is(couch_util:get_value(replication_id, RepTask), FullRepId,
-        "_active_tasks entry has right replication id"),
-    etap:is(couch_util:get_value(continuous, RepTask), true,
-        "_active_tasks entry has continuous property set to true"),
-    etap:is(couch_util:get_value(source, RepTask), Source,
-        "_active_tasks entry has correct source property"),
-    etap:is(couch_util:get_value(target, RepTask), Target,
-        "_active_tasks entry has correct target property"),
-    etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true,
-        "_active_tasks entry has integer docs_read property"),
-    etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true,
-        "_active_tasks entry has integer docs_written property"),
-    etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true,
-        "_active_tasks entry has integer doc_write_failures property"),
-    etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true,
-        "_active_tasks entry has integer revisions_checked property"),
-    etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true,
-        "_active_tasks entry has integer missing_revisions_found property"),
-    etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true,
-        "_active_tasks entry has integer checkpointed_source_seq property"),
-    etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true,
-        "_active_tasks entry has integer source_seq property"),
-    Progress = couch_util:get_value(progress, RepTask),
-    etap:is(is_integer(Progress), true,
-        "_active_tasks entry has an integer progress property"),
-    etap:is(Progress =< 100, true, "Progress is not greater than 100%").
-
-
-wait_writer(Pid, NumDocs) ->
-    case get_writer_num_docs_written(Pid) of
-    N when N >= NumDocs ->
-        ok;
-    _ ->
-        wait_writer(Pid, NumDocs)
-    end.
-
-
-spawn_writer(Db) ->
-    Parent = self(),
-    Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
-    etap:diag("Started source database writer"),
-    Pid.
-
-
-pause_writer(Pid) ->
-    Ref = make_ref(),
-    Pid ! {pause, Ref},
-    receive
-    {paused, Ref} ->
-        ok
-    after 30000 ->
-        etap:bail("Failed to pause source database writer")
-    end.
-
-
-resume_writer(Pid) ->
-    Ref = make_ref(),
-    Pid ! {continue, Ref},
-    receive
-    {ok, Ref} ->
-        ok
-    after 30000 ->
-        etap:bail("Failed to unpause source database writer")
-    end.
-
-
-get_writer_num_docs_written(Pid) ->
-    Ref = make_ref(),
-    Pid ! {get_count, Ref},
-    receive
-    {count, Ref, Count} ->
-        Count
-    after 30000 ->
-        etap:bail("Timeout getting number of documents written from "
-            "source database writer")
-    end.
-
-
-stop_writer(Pid) ->
-    Ref = make_ref(),
-    Pid ! {stop, Ref},
-    receive
-    {stopped, Ref, DocsWritten} ->
-        MonRef = erlang:monitor(process, Pid),
-        receive
-        {'DOWN', MonRef, process, Pid, _Reason} ->
-            etap:diag("Stopped source database writer"),
-            DocsWritten
-        after 30000 ->
-            etap:bail("Timeout stopping source database writer")
-        end
-    after 30000 ->
-        etap:bail("Timeout stopping source database writer")
-    end.
-
-
-writer_loop(#db{name = DbName}, Parent, Counter) ->
-    maybe_pause(Parent, Counter),
-    Doc = couch_doc:from_json_obj({[
-        {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
-        {<<"value">>, Counter + 1},
-        {<<"_attachments">>, {[
-            {<<"icon1.png">>, {[
-                {<<"data">>, base64:encode(att_data())},
-                {<<"content_type">>, <<"image/png">>}
-            ]}},
-            {<<"icon2.png">>, {[
-                {<<"data">>, base64:encode(iolist_to_binary(
-                    [att_data(), att_data()]))},
-                {<<"content_type">>, <<"image/png">>}
-            ]}}
-        ]}}
-    ]}),
-    maybe_pause(Parent, Counter),
-    {ok, Db} = couch_db:open_int(DbName, []),
-    {ok, _} = couch_db:update_doc(Db, Doc, []),
-    ok = couch_db:close(Db),
-    receive
-    {get_count, Ref} ->
-        Parent ! {count, Ref, Counter + 1},
-        writer_loop(Db, Parent, Counter + 1);
-    {stop, Ref} ->
-        Parent ! {stopped, Ref, Counter + 1}
-    after 0 ->
-        ok = timer:sleep(500),
-        writer_loop(Db, Parent, Counter + 1)
-    end.
-
-
-maybe_pause(Parent, Counter) ->
-    receive
-    {get_count, Ref} ->
-        Parent ! {count, Ref, Counter};
-    {pause, Ref} ->
-        Parent ! {paused, Ref},
-        receive {continue, Ref2} -> Parent ! {ok, Ref2} end
-    after 0 ->
-        ok
-    end.
-
-
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-
-create_db(DbName) ->
-    {ok, Db} = couch_db:create(
-        DbName,
-        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
-    couch_db:close(Db),
-    {ok, Db}.
-
-
-delete_db(#db{name = DbName, main_pid = Pid}) ->
-    ok = couch_server:delete(
-        DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-    {'DOWN', MonRef, process, Pid, _Reason} ->
-        ok
-    after 30000 ->
-        etap:bail("Timeout deleting database")
-    end.
-
-
-replicate({remote, Db}, Target) ->
-    replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
-    replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target},
-        {<<"continuous">>, true}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
-        RepObject, #user_ctx{roles = [<<"_admin">>]}),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    {ok, Pid, Rep#rep.id}.
-
-
-cancel_replication(RepId, RepPid) ->
-    {ok, _} = couch_replicator:cancel_replication(RepId),
-    etap:is(is_process_alive(RepPid), false,
-        "Replication process is no longer alive after cancel").
-
-
-att_data() ->
-    {ok, Data} = file:read_file(
-        test_util:source_file("share/www/image/logo.png")),
-    Data.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/003-replication-large-atts.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/003-replication-large-atts.t b/src/couch_replicator/test/003-replication-large-atts.t
deleted file mode 100755
index 5386179..0000000
--- a/src/couch_replicator/test/003-replication-large-atts.t
+++ /dev/null
@@ -1,267 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Test replication of large attachments. Verify that both source and
-% target have the same attachment data and metadata.
-
--define(b2l(Bin), binary_to_list(Bin)).
-
--record(user_ctx, {
-    name = null,
-    roles = [],
-    handler
-}).
-
--record(doc, {
-    id = <<"">>,
-    revs = {0, []},
-    body = {[]},
-    atts = [],
-    deleted = false,
-    meta = []
-}).
-
--record(att, {
-    name,
-    type,
-    att_len,
-    disk_len,
-    md5= <<>>,
-    revpos=0,
-    data,
-    encoding=identity
-}).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-
-main(_) ->
-    test_util:init_code_path(),
-
-    etap:plan(1192),
-    case (catch test()) of
-        ok ->
-            etap:end_tests();
-        Other ->
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
-            etap:bail(Other)
-    end,
-    ok.
-
-
-test() ->
-    couch_server_sup:start_link(test_util:config_files()),
-    ibrowse:start(),
-    crypto:start(),
-    couch_config:set("attachments", "compressible_types", "text/*", false),
-
-    Pairs = [
-        {source_db_name(), target_db_name()},
-        {{remote, source_db_name()}, target_db_name()},
-        {source_db_name(), {remote, target_db_name()}},
-        {{remote, source_db_name()}, {remote, (target_db_name())}}
-    ],
-
-    {ok, SourceDb} = create_db(source_db_name()),
-    etap:diag("Populating source database"),
-    populate_db(SourceDb, 11),
-    ok = couch_db:close(SourceDb),
-
-    lists:foreach(
-        fun({Source, Target}) ->
-            etap:diag("Creating target database"),
-            {ok, TargetDb} = create_db(target_db_name()),
-
-            ok = couch_db:close(TargetDb),
-            etap:diag("Triggering replication"),
-            replicate(Source, Target),
-            etap:diag("Replication finished, comparing source and target databases"),
-            compare_dbs(SourceDb, TargetDb),
-
-            etap:diag("Deleting target database"),
-            delete_db(TargetDb),
-            ok = timer:sleep(1000)
-        end,
-        Pairs),
-
-    delete_db(SourceDb),
-    couch_server_sup:stop(),
-    ok.
-
-
-populate_db(Db, DocCount) ->
-    Docs = lists:foldl(
-        fun(DocIdCounter, Acc) ->
-            Doc = #doc{
-                id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
-                body = {[]},
-                atts = [
-                    att(<<"att1">>, 2 * 1024 * 1024, <<"text/plain">>),
-                    att(<<"att2">>, round(6.6 * 1024 * 1024), <<"app/binary">>)
-                ]
-            },
-            [Doc | Acc]
-        end,
-        [], lists:seq(1, DocCount)),
-    {ok, _} = couch_db:update_docs(Db, Docs, []).
-
-
-att(Name, Size, Type) ->
-    #att{
-        name = Name,
-        type = Type,
-        att_len = Size,
-        data = fun(Count) -> crypto:rand_bytes(Count) end
-    }.
-
-
-compare_dbs(Source, Target) ->
-    {ok, SourceDb} = couch_db:open_int(couch_db:name(Source), []),
-    {ok, TargetDb} = couch_db:open_int(couch_db:name(Target), []),
-
-    Fun = fun(FullDocInfo, _, Acc) ->
-        {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
-        Id = DocSource#doc.id,
-
-        etap:diag("Verifying document " ++ ?b2l(Id)),
-
-        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
-        etap:is(DocTarget#doc.body, DocSource#doc.body,
-            "Same body in source and target databases"),
-
-        #doc{atts = SourceAtts} = DocSource,
-        #doc{atts = TargetAtts} = DocTarget,
-        etap:is(
-            lists:sort([N || #att{name = N} <- SourceAtts]),
-            lists:sort([N || #att{name = N} <- TargetAtts]),
-            "Document has same number (and names) of attachments in "
-            "source and target databases"),
-
-        lists:foreach(
-            fun(#att{name = AttName} = Att) ->
-                etap:diag("Verifying attachment " ++ ?b2l(AttName)),
-
-                {ok, AttTarget} = find_att(TargetAtts, AttName),
-                SourceMd5 = att_md5(Att),
-                TargetMd5 = att_md5(AttTarget),
-                case AttName of
-                <<"att1">> ->
-                    etap:is(Att#att.encoding, gzip,
-                        "Attachment is gzip encoded in source database"),
-                    etap:is(AttTarget#att.encoding, gzip,
-                        "Attachment is gzip encoded in target database"),
-                    DecSourceMd5 = att_decoded_md5(Att),
-                    DecTargetMd5 = att_decoded_md5(AttTarget),
-                    etap:is(DecTargetMd5, DecSourceMd5,
-                        "Same identity content in source and target databases");
-                _ ->
-                    etap:is(Att#att.encoding, identity,
-                        "Attachment is not encoded in source database"),
-                    etap:is(AttTarget#att.encoding, identity,
-                        "Attachment is not encoded in target database")
-                end,
-                etap:is(TargetMd5, SourceMd5,
-                    "Same content in source and target databases"),
-                etap:is(is_integer(Att#att.disk_len), true,
-                    "#att.disk_len is an integer in source database"),
-                etap:is(is_integer(Att#att.att_len), true,
-                    "#att.att_len is an integer in source database"),
-                etap:is(is_integer(AttTarget#att.disk_len), true,
-                    "#att.disk_len is an integer in target database"),
-                etap:is(is_integer(AttTarget#att.att_len), true,
-                    "#att.att_len is an integer in target database"),
-                etap:is(Att#att.disk_len, AttTarget#att.disk_len,
-                    "Same identity length in source and target databases"),
-                etap:is(Att#att.att_len, AttTarget#att.att_len,
-                    "Same encoded length in source and target databases"),
-                etap:is(Att#att.type, AttTarget#att.type,
-                    "Same type in source and target databases"),
-                etap:is(Att#att.md5, SourceMd5, "Correct MD5 in source database"),
-                etap:is(AttTarget#att.md5, SourceMd5, "Correct MD5 in target database")
-            end,
-            SourceAtts),
-
-        {ok, Acc}
-    end,
-
-    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
-    ok = couch_db:close(SourceDb),
-    ok = couch_db:close(TargetDb).
-
-
-find_att([], _Name) ->
-    nil;
-find_att([#att{name = Name} = Att | _], Name) ->
-    {ok, Att};
-find_att([_ | Rest], Name) ->
-    find_att(Rest, Name).
-
-
-att_md5(Att) ->
-    Md50 = couch_doc:att_foldl(
-        Att,
-        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
-        couch_util:md5_init()),
-    couch_util:md5_final(Md50).
-
-att_decoded_md5(Att) ->
-    Md50 = couch_doc:att_foldl_decode(
-        Att,
-        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
-        couch_util:md5_init()),
-    couch_util:md5_final(Md50).
-
-
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-
-create_db(DbName) ->
-    couch_db:create(
-        DbName,
-        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
-
-
-delete_db(Db) ->
-    ok = couch_server:delete(
-        couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
-
-
-replicate({remote, Db}, Target) ->
-    replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
-    replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
-        RepObject, #user_ctx{roles = [<<"_admin">>]}),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-    {'DOWN', MonRef, process, Pid, Reason} ->
-        etap:is(Reason, normal, "Replication finished successfully")
-    after 300000 ->
-        etap:bail("Timeout waiting for replication to finish")
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/004-replication-many-leaves.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/004-replication-many-leaves.t b/src/couch_replicator/test/004-replication-many-leaves.t
deleted file mode 100755
index 52d2023..0000000
--- a/src/couch_replicator/test/004-replication-many-leaves.t
+++ /dev/null
@@ -1,216 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Test replication of documents with many leaf revisions.
-% Motivated by COUCHDB-1340 and other similar issues where a document
-% GET with a too long ?open_revs revision list doesn't work due to
-% maximum web server limits for the HTTP request path.
-
--record(user_ctx, {
-    name = null,
-    roles = [],
-    handler
-}).
-
--record(doc, {
-    id = <<"">>,
-    revs = {0, []},
-    body = {[]},
-    atts = [],
-    deleted = false,
-    meta = []
-}).
-
--define(b2l(B), binary_to_list(B)).
--define(l2b(L), list_to_binary(L)).
--define(i2l(I), integer_to_list(I)).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-doc_ids() ->
-    [<<"doc1">>, <<"doc2">>, <<"doc3">>].
-
-doc_num_conflicts(<<"doc1">>) -> 100;
-doc_num_conflicts(<<"doc2">>) -> 200;
-doc_num_conflicts(<<"doc3">>) -> 550.
-
-
-main(_) ->
-    test_util:init_code_path(),
-
-    etap:plan(16),
-    case (catch test()) of
-        ok ->
-            etap:end_tests();
-        Other ->
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
-            etap:bail(Other)
-    end,
-    ok.
-
-
-test() ->
-    couch_server_sup:start_link(test_util:config_files()),
-    ibrowse:start(),
-    crypto:start(),
-
-    Pairs = [
-        {source_db_name(), target_db_name()},
-        {{remote, source_db_name()}, target_db_name()},
-        {source_db_name(), {remote, target_db_name()}},
-        {{remote, source_db_name()}, {remote, (target_db_name())}}
-    ],
-
-    {ok, SourceDb} = create_db(source_db_name()),
-    etap:diag("Populating source database"),
-    {ok, DocRevs} = populate_db(SourceDb),
-    ok = couch_db:close(SourceDb),
-
-    lists:foreach(
-        fun({Source, Target}) ->
-            etap:diag("Creating target database"),
-            {ok, TargetDb} = create_db(target_db_name()),
-
-            ok = couch_db:close(TargetDb),
-            etap:diag("Triggering replication"),
-            replicate(Source, Target),
-            etap:diag("Replication finished, comparing source and target databases"),
-            {ok, TargetDb2} = couch_db:open_int(target_db_name(), []),
-            verify_target(TargetDb2, DocRevs),
-            ok = couch_db:close(TargetDb2),
-
-            etap:diag("Deleting target database"),
-            delete_db(TargetDb),
-            ok = timer:sleep(1000)
-        end,
-        Pairs),
-
-    delete_db(SourceDb),
-    couch_server_sup:stop(),
-    ok.
-
-
-populate_db(Db) ->
-    DocRevsDict = lists:foldl(
-        fun(DocId, Acc) ->
-            Value = <<"0">>,
-            Doc = #doc{
-                id = DocId,
-                body = {[ {<<"value">>, Value} ]}
-            },
-            {ok, Rev} = couch_db:update_doc(Db, Doc, []),
-            {ok, RevsDict} = add_doc_siblings(Db, DocId, doc_num_conflicts(DocId)),
-            RevsDict2 = dict:store(Rev, Value, RevsDict),
-            dict:store(DocId, RevsDict2, Acc)
-        end,
-        dict:new(), doc_ids()),
-    {ok, dict:to_list(DocRevsDict)}.
-
-
-add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 ->
-    add_doc_siblings(Db, DocId, NumLeaves, [], dict:new()).
-
-
-add_doc_siblings(Db, _DocId, 0, AccDocs, RevsDict) ->
-    {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes),
-    {ok, RevsDict};
-
-add_doc_siblings(Db, DocId, NumLeaves, AccDocs, RevsDict) ->
-    Value = list_to_binary(integer_to_list(NumLeaves)),
-    Rev = couch_util:md5(Value),
-    RevsDict2 = dict:store({1, Rev}, Value, RevsDict),
-    Doc = #doc{
-        id = DocId,
-        revs = {1, [Rev]},
-        body = {[ {<<"value">>, Value} ]}
-    },
-    add_doc_siblings(Db, DocId, NumLeaves - 1, [Doc | AccDocs], RevsDict2).
-
-
-verify_target(_TargetDb, []) ->
-    ok;
-
-verify_target(TargetDb, [{DocId, RevsDict} | Rest]) ->
-    {ok, Lookups} = couch_db:open_doc_revs(
-        TargetDb,
-        DocId,
-        [R || {R, _} <- dict:to_list(RevsDict)],
-        [ejson_body]),
-    Docs = [Doc || {ok, Doc} <- Lookups],
-    Total = doc_num_conflicts(DocId) + 1,
-    etap:is(
-        length(Docs),
-        Total,
-        "Target has " ++ ?i2l(Total) ++ " leaf revisions of document " ++ ?b2l(DocId)),
-    etap:diag("Verifying all revisions of document " ++ ?b2l(DocId)),
-    lists:foreach(
-        fun(#doc{revs = {Pos, [RevId]}, body = {Body}}) ->
-            Rev = {Pos, RevId},
-            {ok, Value} = dict:find(Rev, RevsDict),
-            case couch_util:get_value(<<"value">>, Body) of
-            Value ->
-                ok;
-            Other ->
-                etap:bail("Wrong value for revision " ++
-                    ?b2l(couch_doc:rev_to_str(Rev)) ++ " of document " ++
-                    ?b2l(DocId) ++ ". Expected `" ++ couch_util:to_list(Value) ++
-                    "`, got `" ++ couch_util:to_list(Other) ++ "`")
-            end
-        end,
-        Docs),
-    verify_target(TargetDb, Rest).
-
-
-db_url(DbName) ->
-    iolist_to_binary([
-        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
-        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
-        "/", DbName
-    ]).
-
-
-create_db(DbName) ->
-    couch_db:create(
-        DbName,
-        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
-
-
-delete_db(Db) ->
-    ok = couch_server:delete(
-        couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
-
-
-replicate({remote, Db}, Target) ->
-    replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
-    replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
-    RepObject = {[
-        {<<"source">>, Source},
-        {<<"target">>, Target}
-    ]},
-    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
-        RepObject, #user_ctx{roles = [<<"_admin">>]}),
-    {ok, Pid} = couch_replicator:async_replicate(Rep),
-    MonRef = erlang:monitor(process, Pid),
-    receive
-    {'DOWN', MonRef, process, Pid, Reason} ->
-        etap:is(Reason, normal, "Replication finished successfully")
-    after 300000 ->
-        etap:bail("Timeout waiting for replication to finish")
-    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/01-load.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/01-load.t b/src/couch_replicator/test/01-load.t
new file mode 100644
index 0000000..07561a7
--- /dev/null
+++ b/src/couch_replicator/test/01-load.t
@@ -0,0 +1,37 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test that we can load each module.
+
+main(_) ->
+    test_util:init_code_path(),
+    Modules = [
+        couch_replicator_api_wrap,
+        couch_replicator_httpc,
+        couch_replicator_httpd,
+        couch_replicator_manager,
+        couch_replicator_notifier,
+        couch_replicator,
+        couch_replicator_worker,
+        couch_replicator_utils,
+        couch_replicator_job_sup
+    ],
+
+    etap:plan(length(Modules)),
+    lists:foreach(
+        fun(Module) ->
+            etap_can:loaded_ok(Module, lists:concat(["Loaded: ", Module]))
+        end, Modules),
+    etap:end_tests().

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/02-httpc-pool.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/02-httpc-pool.t b/src/couch_replicator/test/02-httpc-pool.t
new file mode 100755
index 0000000..a7bde6c
--- /dev/null
+++ b/src/couch_replicator/test/02-httpc-pool.t
@@ -0,0 +1,250 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(55),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    couch_server_sup:start_link(test_util:config_files()),
+    ibrowse:start(),
+
+    test_pool_full(),
+    test_worker_dead_pool_non_full(),
+    test_worker_dead_pool_full(),
+
+    couch_server_sup:stop(),
+    ok.
+
+
+test_pool_full() ->
+    Pool = spawn_pool(),
+    Client1 = spawn_client(Pool),
+    Client2 = spawn_client(Pool),
+    Client3 = spawn_client(Pool),
+
+    etap:diag("Check that we can spawn the max number of connections."),
+    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+    Worker1 = get_client_worker(Client1, "1"),
+    Worker2 = get_client_worker(Client2, "2"),
+    Worker3 = get_client_worker(Client3, "3"),
+    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+    etap:diag("Check that client 4 blocks waiting for a worker."),
+    Client4 = spawn_client(Pool),
+    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+    etap:diag("Check that stopping a client gives up its worker."),
+    etap:is(stop_client(Client1), ok, "First client stopped."),
+
+    etap:diag("And check that our blocked client has been unblocked."),
+    etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
+
+    Worker4 = get_client_worker(Client4, "4"),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+    etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
+
+    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
+    stop_pool(Pool).
+
+
+test_worker_dead_pool_non_full() ->
+    Pool = spawn_pool(),
+    Client1 = spawn_client(Pool),
+
+    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+    Worker1 = get_client_worker(Client1, "1"),
+    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+
+    etap:diag("Kill client's 1 worker."),
+    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+    etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
+
+    Client2 = spawn_client(Pool),
+    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+    Worker2 = get_client_worker(Client2, "2"),
+    etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+
+    etap:is(stop_client(Client2), ok, "Second client stopped."),
+    stop_pool(Pool).
+
+
+test_worker_dead_pool_full() ->
+    Pool = spawn_pool(),
+    Client1 = spawn_client(Pool),
+    Client2 = spawn_client(Pool),
+    Client3 = spawn_client(Pool),
+
+    etap:diag("Check that we can spawn the max number of connections."),
+    etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+    etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+    etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+    Worker1 = get_client_worker(Client1, "1"),
+    Worker2 = get_client_worker(Client2, "2"),
+    Worker3 = get_client_worker(Client3, "3"),
+    etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+    etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+    etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+    etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+    etap:diag("Check that client 4 blocks waiting for a worker."),
+    Client4 = spawn_client(Pool),
+    etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+    etap:diag("Kill client's 1 worker."),
+    etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+    etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+    etap:diag("Check client 4 got unblocked after first worker's death"),
+    etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
+
+    Worker4 = get_client_worker(Client4, "4"),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+    etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
+    etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
+    etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
+
+    etap:diag("Check that stopping client 1 is a noop."),
+    etap:is(stop_client(Client1), ok, "First client stopped."),
+
+    etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+
+    etap:diag("Check that client 5 blocks waiting for a worker."),
+    Client5 = spawn_client(Pool),
+    etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
+
+    etap:diag("Check that stopping client 2 gives up its worker."),
+    etap:is(stop_client(Client2), ok, "Second client stopped."),
+
+    etap:diag("Now check that client 5 has been unblocked."),
+    etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
+
+    Worker5 = get_client_worker(Client5, "5"),
+    etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
+    etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
+    etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
+    etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
+    etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
+
+    etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+    etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+    etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
+
+    lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
+    stop_pool(Pool).
+
+
+spawn_client(Pool) ->
+    Parent = self(),
+    Ref = make_ref(),
+    Pid = spawn(fun() ->
+        {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool),
+        loop(Parent, Ref, Worker, Pool)
+    end),
+    {Pid, Ref}.
+
+
+ping_client({Pid, Ref}) ->
+    Pid ! ping,
+    receive
+        {pong, Ref} ->
+            ok
+    after 3000 ->
+        timeout
+    end.
+
+
+get_client_worker({Pid, Ref}, ClientName) ->
+    Pid ! get_worker,
+    receive
+        {worker, Ref, Worker} ->
+            Worker
+    after 3000 ->
+        etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
+    end.
+
+
+stop_client({Pid, Ref}) ->
+    Pid ! stop,
+    receive
+        {stop, Ref} ->
+            ok
+    after 3000 ->
+        timeout
+    end.
+
+
+kill_client_worker({Pid, Ref}) ->
+    Pid ! get_worker,
+    receive
+        {worker, Ref, Worker} ->
+            exit(Worker, kill),
+            ok
+    after 3000 ->
+        timeout
+    end.
+
+
+loop(Parent, Ref, Worker, Pool) ->
+    receive
+        ping ->
+            Parent ! {pong, Ref},
+            loop(Parent, Ref, Worker, Pool);
+        get_worker  ->
+            Parent ! {worker, Ref, Worker},
+            loop(Parent, Ref, Worker, Pool);
+        stop ->
+            couch_replicator_httpc_pool:release_worker(Pool, Worker),
+            Parent ! {stop, Ref}
+    end.
+
+
+spawn_pool() ->
+    Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
+    Port = couch_config:get("httpd", "port", "5984"),
+    {ok, Pool} = couch_replicator_httpc_pool:start_link(
+        "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
+    Pool.
+
+
+stop_pool(Pool) ->
+    ok = couch_replicator_httpc_pool:stop(Pool).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/03-replication-compact.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/03-replication-compact.t b/src/couch_replicator/test/03-replication-compact.t
new file mode 100755
index 0000000..c8b265e
--- /dev/null
+++ b/src/couch_replicator/test/03-replication-compact.t
@@ -0,0 +1,486 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Verify that compacting databases that are being used as the source or
+% target of a replication doesn't affect the replication and that the
+% replication doesn't hold their reference counters forever.
+
+-define(b2l(B), binary_to_list(B)).
+
+-record(user_ctx, {
+    name = null,
+    roles = [],
+    handler
+}).
+
+-record(db, {
+    main_pid = nil,
+    update_pid = nil,
+    compactor_pid = nil,
+    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+    fd,
+    updater_fd,
+    fd_ref_counter,
+    header = nil,
+    committed_update_seq,
+    fulldocinfo_by_id_btree,
+    docinfo_by_seq_btree,
+    local_docs_btree,
+    update_seq,
+    name,
+    filepath,
+    validate_doc_funs = [],
+    security = [],
+    security_ptr = nil,
+    user_ctx = #user_ctx{},
+    waiting_delayed_commit = nil,
+    revs_limit = 1000,
+    fsync_options = [],
+    options = [],
+    compression
+}).
+
+-record(rep, {
+    id,
+    source,
+    target,
+    options,
+    user_ctx,
+    doc_id
+}).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(376),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    couch_server_sup:start_link(test_util:config_files()),
+    ibrowse:start(),
+
+    Pairs = [
+        {source_db_name(), target_db_name()},
+        {{remote, source_db_name()}, target_db_name()},
+        {source_db_name(), {remote, target_db_name()}},
+        {{remote, source_db_name()}, {remote, (target_db_name())}}
+    ],
+
+    lists:foreach(
+        fun({Source, Target}) ->
+            {ok, SourceDb} = create_db(source_db_name()),
+            etap:is(couch_db:is_idle(SourceDb), true,
+                "Source database is idle before starting replication"),
+
+            {ok, TargetDb} = create_db(target_db_name()),
+            etap:is(couch_db:is_idle(TargetDb), true,
+                "Target database is idle before starting replication"),
+
+            {ok, RepPid, RepId} = replicate(Source, Target),
+            check_active_tasks(RepPid, RepId, Source, Target),
+            {ok, DocsWritten} = populate_and_compact_test(
+                RepPid, SourceDb, TargetDb),
+
+            wait_target_in_sync(DocsWritten, TargetDb),
+            check_active_tasks(RepPid, RepId, Source, Target),
+            cancel_replication(RepId, RepPid),
+            compare_dbs(SourceDb, TargetDb),
+
+            delete_db(SourceDb),
+            delete_db(TargetDb),
+            couch_server_sup:stop(),
+            ok = timer:sleep(1000),
+            couch_server_sup:start_link(test_util:config_files())
+        end,
+        Pairs),
+
+    couch_server_sup:stop(),
+    ok.
+
+
+populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
+    etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
+    check_db_alive("source", SourceDb0),
+    check_db_alive("target", TargetDb0),
+
+    Writer = spawn_writer(SourceDb0),
+
+    lists:foldl(
+        fun(_, {SourceDb, TargetDb, DocCount}) ->
+            pause_writer(Writer),
+
+            compact_db("source", SourceDb),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after source database compaction"),
+            check_db_alive("source", SourceDb),
+            check_ref_counter("source", SourceDb),
+
+            compact_db("target", TargetDb),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after target database compaction"),
+            check_db_alive("target", TargetDb),
+            check_ref_counter("target", TargetDb),
+
+            {ok, SourceDb2} = reopen_db(SourceDb),
+            {ok, TargetDb2} = reopen_db(TargetDb),
+
+            resume_writer(Writer),
+            wait_writer(Writer, DocCount),
+
+            compact_db("source", SourceDb2),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after source database compaction"),
+            check_db_alive("source", SourceDb2),
+            pause_writer(Writer),
+            check_ref_counter("source", SourceDb2),
+            resume_writer(Writer),
+
+            compact_db("target", TargetDb2),
+            etap:is(is_process_alive(RepPid), true,
+                "Replication process is alive after target database compaction"),
+            check_db_alive("target", TargetDb2),
+            pause_writer(Writer),
+            check_ref_counter("target", TargetDb2),
+            resume_writer(Writer),
+
+            {ok, SourceDb3} = reopen_db(SourceDb2),
+            {ok, TargetDb3} = reopen_db(TargetDb2),
+            {SourceDb3, TargetDb3, DocCount + 50}
+        end,
+        {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
+
+    DocsWritten = stop_writer(Writer),
+    {ok, DocsWritten}.
+
+
+check_db_alive(Type, #db{main_pid = Pid}) ->
+    etap:is(is_process_alive(Pid), true,
+        "Local " ++ Type ++ " database main pid is alive").
+
+
+compact_db(Type, #db{name = Name}) ->
+    {ok, Db} = couch_db:open_int(Name, []),
+    {ok, CompactPid} = couch_db:start_compact(Db),
+    MonRef = erlang:monitor(process, CompactPid),
+    receive
+    {'DOWN', MonRef, process, CompactPid, normal} ->
+        ok;
+    {'DOWN', MonRef, process, CompactPid, Reason} ->
+        etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
+            ": " ++ couch_util:to_list(Reason))
+    after 30000 ->
+        etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
+            " didn't finish")
+    end,
+    ok = couch_db:close(Db).
+
+
+check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
+    MonRef = erlang:monitor(process, OldRefCounter),
+    receive
+    {'DOWN', MonRef, process, OldRefCounter, _} ->
+        etap:diag("Old " ++ Type ++ " database ref counter terminated")
+    after 30000 ->
+        etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
+    end,
+    {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
+    ok = couch_db:close(Db),
+    etap:isnt(
+        NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
+
+
+reopen_db(#db{name = Name}) ->
+    {ok, Db} = couch_db:open_int(Name, []),
+    ok = couch_db:close(Db),
+    {ok, Db}.
+
+
+wait_target_in_sync(DocCount, #db{name = TargetName}) ->
+    wait_target_in_sync_loop(DocCount, TargetName, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+    etap:bail("Could not get source and target databases in sync");
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+    {ok, Target} = couch_db:open_int(TargetName, []),
+    {ok, TargetInfo} = couch_db:get_db_info(Target),
+    ok = couch_db:close(Target),
+    TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+    case TargetDocCount == DocCount of
+    true ->
+        etap:diag("Source and target databases are in sync");
+    false ->
+        ok = timer:sleep(100),
+        wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+    end.
+
+
+compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
+    {ok, SourceDb} = couch_db:open_int(SourceName, []),
+    {ok, TargetDb} = couch_db:open_int(TargetName, []),
+    Fun = fun(FullDocInfo, _, Acc) ->
+        {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
+        {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
+        DocId = couch_util:get_value(<<"_id">>, Props),
+        DocTarget = case couch_db:open_doc(TargetDb, DocId) of
+        {ok, DocT} ->
+            DocT;
+        Error ->
+            etap:bail("Error opening document '" ++ ?b2l(DocId) ++
+                "' from target: " ++ couch_util:to_list(Error))
+        end,
+        DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
+        case DocTargetJson of
+        DocJson ->
+            ok;
+        _ ->
+            etap:bail("Content from document '" ++ ?b2l(DocId) ++
+                "' differs in target database")
+        end,
+        {ok, Acc}
+    end,
+    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+    etap:diag("Target database has the same documents as the source database"),
+    ok = couch_db:close(SourceDb),
+    ok = couch_db:close(TargetDb).
+
+
+check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
+    Source = case Src of
+    {remote, NameSrc} ->
+        <<(db_url(NameSrc))/binary, $/>>;
+    _ ->
+        Src
+    end,
+    Target = case Tgt of
+    {remote, NameTgt} ->
+        <<(db_url(NameTgt))/binary, $/>>;
+    _ ->
+        Tgt
+    end,
+    FullRepId = list_to_binary(BaseId ++ Ext),
+    Pid = list_to_binary(pid_to_list(RepPid)),
+    [RepTask] = couch_task_status:all(),
+    etap:is(couch_util:get_value(pid, RepTask), Pid,
+        "_active_tasks entry has correct pid property"),
+    etap:is(couch_util:get_value(replication_id, RepTask), FullRepId,
+        "_active_tasks entry has right replication id"),
+    etap:is(couch_util:get_value(continuous, RepTask), true,
+        "_active_tasks entry has continuous property set to true"),
+    etap:is(couch_util:get_value(source, RepTask), Source,
+        "_active_tasks entry has correct source property"),
+    etap:is(couch_util:get_value(target, RepTask), Target,
+        "_active_tasks entry has correct target property"),
+    etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true,
+        "_active_tasks entry has integer docs_read property"),
+    etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true,
+        "_active_tasks entry has integer docs_written property"),
+    etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true,
+        "_active_tasks entry has integer doc_write_failures property"),
+    etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true,
+        "_active_tasks entry has integer revisions_checked property"),
+    etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true,
+        "_active_tasks entry has integer missing_revisions_found property"),
+    etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true,
+        "_active_tasks entry has integer checkpointed_source_seq property"),
+    etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true,
+        "_active_tasks entry has integer source_seq property"),
+    Progress = couch_util:get_value(progress, RepTask),
+    etap:is(is_integer(Progress), true,
+        "_active_tasks entry has an integer progress property"),
+    etap:is(Progress =< 100, true, "Progress is not greater than 100%").
+
+
+wait_writer(Pid, NumDocs) ->
+    case get_writer_num_docs_written(Pid) of
+    N when N >= NumDocs ->
+        ok;
+    _ ->
+        wait_writer(Pid, NumDocs)
+    end.
+
+
+spawn_writer(Db) ->
+    Parent = self(),
+    Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
+    etap:diag("Started source database writer"),
+    Pid.
+
+
+pause_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {pause, Ref},
+    receive
+    {paused, Ref} ->
+        ok
+    after 30000 ->
+        etap:bail("Failed to pause source database writer")
+    end.
+
+
+resume_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {continue, Ref},
+    receive
+    {ok, Ref} ->
+        ok
+    after 30000 ->
+        etap:bail("Failed to unpause source database writer")
+    end.
+
+
+get_writer_num_docs_written(Pid) ->
+    Ref = make_ref(),
+    Pid ! {get_count, Ref},
+    receive
+    {count, Ref, Count} ->
+        Count
+    after 30000 ->
+        etap:bail("Timeout getting number of documents written from "
+            "source database writer")
+    end.
+
+
+stop_writer(Pid) ->
+    Ref = make_ref(),
+    Pid ! {stop, Ref},
+    receive
+    {stopped, Ref, DocsWritten} ->
+        MonRef = erlang:monitor(process, Pid),
+        receive
+        {'DOWN', MonRef, process, Pid, _Reason} ->
+            etap:diag("Stopped source database writer"),
+            DocsWritten
+        after 30000 ->
+            etap:bail("Timeout stopping source database writer")
+        end
+    after 30000 ->
+        etap:bail("Timeout stopping source database writer")
+    end.
+
+
+writer_loop(#db{name = DbName}, Parent, Counter) ->
+    maybe_pause(Parent, Counter),
+    Doc = couch_doc:from_json_obj({[
+        {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
+        {<<"value">>, Counter + 1},
+        {<<"_attachments">>, {[
+            {<<"icon1.png">>, {[
+                {<<"data">>, base64:encode(att_data())},
+                {<<"content_type">>, <<"image/png">>}
+            ]}},
+            {<<"icon2.png">>, {[
+                {<<"data">>, base64:encode(iolist_to_binary(
+                    [att_data(), att_data()]))},
+                {<<"content_type">>, <<"image/png">>}
+            ]}}
+        ]}}
+    ]}),
+    maybe_pause(Parent, Counter),
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, _} = couch_db:update_doc(Db, Doc, []),
+    ok = couch_db:close(Db),
+    receive
+    {get_count, Ref} ->
+        Parent ! {count, Ref, Counter + 1},
+        writer_loop(Db, Parent, Counter + 1);
+    {stop, Ref} ->
+        Parent ! {stopped, Ref, Counter + 1}
+    after 0 ->
+        ok = timer:sleep(500),
+        writer_loop(Db, Parent, Counter + 1)
+    end.
+
+
+maybe_pause(Parent, Counter) ->
+    receive
+    {get_count, Ref} ->
+        Parent ! {count, Ref, Counter};
+    {pause, Ref} ->
+        Parent ! {paused, Ref},
+        receive {continue, Ref2} -> Parent ! {ok, Ref2} end
+    after 0 ->
+        ok
+    end.
+
+
+db_url(DbName) ->
+    iolist_to_binary([
+        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+        "/", DbName
+    ]).
+
+
+create_db(DbName) ->
+    {ok, Db} = couch_db:create(
+        DbName,
+        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
+    couch_db:close(Db),
+    {ok, Db}.
+
+
+delete_db(#db{name = DbName, main_pid = Pid}) ->
+    ok = couch_server:delete(
+        DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
+    MonRef = erlang:monitor(process, Pid),
+    receive
+    {'DOWN', MonRef, process, Pid, _Reason} ->
+        ok
+    after 30000 ->
+        etap:bail("Timeout deleting database")
+    end.
+
+
+replicate({remote, Db}, Target) ->
+    replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+    replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target},
+        {<<"continuous">>, true}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+        RepObject, #user_ctx{roles = [<<"_admin">>]}),
+    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    {ok, Pid, Rep#rep.id}.
+
+
+cancel_replication(RepId, RepPid) ->
+    {ok, _} = couch_replicator:cancel_replication(RepId),
+    etap:is(is_process_alive(RepPid), false,
+        "Replication process is no longer alive after cancel").
+
+
+att_data() ->
+    {ok, Data} = file:read_file(
+        test_util:source_file("share/www/image/logo.png")),
+    Data.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/04-replication-large-atts.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/04-replication-large-atts.t b/src/couch_replicator/test/04-replication-large-atts.t
new file mode 100755
index 0000000..5386179
--- /dev/null
+++ b/src/couch_replicator/test/04-replication-large-atts.t
@@ -0,0 +1,267 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test replication of large attachments. Verify that both source and
+% target have the same attachment data and metadata.
+
+-define(b2l(Bin), binary_to_list(Bin)).
+
+-record(user_ctx, {
+    name = null,
+    roles = [],
+    handler
+}).
+
+-record(doc, {
+    id = <<"">>,
+    revs = {0, []},
+    body = {[]},
+    atts = [],
+    deleted = false,
+    meta = []
+}).
+
+-record(att, {
+    name,
+    type,
+    att_len,
+    disk_len,
+    md5= <<>>,
+    revpos=0,
+    data,
+    encoding=identity
+}).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(1192),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    couch_server_sup:start_link(test_util:config_files()),
+    ibrowse:start(),
+    crypto:start(),
+    couch_config:set("attachments", "compressible_types", "text/*", false),
+
+    Pairs = [
+        {source_db_name(), target_db_name()},
+        {{remote, source_db_name()}, target_db_name()},
+        {source_db_name(), {remote, target_db_name()}},
+        {{remote, source_db_name()}, {remote, (target_db_name())}}
+    ],
+
+    {ok, SourceDb} = create_db(source_db_name()),
+    etap:diag("Populating source database"),
+    populate_db(SourceDb, 11),
+    ok = couch_db:close(SourceDb),
+
+    lists:foreach(
+        fun({Source, Target}) ->
+            etap:diag("Creating target database"),
+            {ok, TargetDb} = create_db(target_db_name()),
+
+            ok = couch_db:close(TargetDb),
+            etap:diag("Triggering replication"),
+            replicate(Source, Target),
+            etap:diag("Replication finished, comparing source and target databases"),
+            compare_dbs(SourceDb, TargetDb),
+
+            etap:diag("Deleting target database"),
+            delete_db(TargetDb),
+            ok = timer:sleep(1000)
+        end,
+        Pairs),
+
+    delete_db(SourceDb),
+    couch_server_sup:stop(),
+    ok.
+
+
+populate_db(Db, DocCount) ->
+    Docs = lists:foldl(
+        fun(DocIdCounter, Acc) ->
+            Doc = #doc{
+                id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
+                body = {[]},
+                atts = [
+                    att(<<"att1">>, 2 * 1024 * 1024, <<"text/plain">>),
+                    att(<<"att2">>, round(6.6 * 1024 * 1024), <<"app/binary">>)
+                ]
+            },
+            [Doc | Acc]
+        end,
+        [], lists:seq(1, DocCount)),
+    {ok, _} = couch_db:update_docs(Db, Docs, []).
+
+
+att(Name, Size, Type) ->
+    #att{
+        name = Name,
+        type = Type,
+        att_len = Size,
+        data = fun(Count) -> crypto:rand_bytes(Count) end
+    }.
+
+
+compare_dbs(Source, Target) ->
+    {ok, SourceDb} = couch_db:open_int(couch_db:name(Source), []),
+    {ok, TargetDb} = couch_db:open_int(couch_db:name(Target), []),
+
+    Fun = fun(FullDocInfo, _, Acc) ->
+        {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
+        Id = DocSource#doc.id,
+
+        etap:diag("Verifying document " ++ ?b2l(Id)),
+
+        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
+        etap:is(DocTarget#doc.body, DocSource#doc.body,
+            "Same body in source and target databases"),
+
+        #doc{atts = SourceAtts} = DocSource,
+        #doc{atts = TargetAtts} = DocTarget,
+        etap:is(
+            lists:sort([N || #att{name = N} <- SourceAtts]),
+            lists:sort([N || #att{name = N} <- TargetAtts]),
+            "Document has same number (and names) of attachments in "
+            "source and target databases"),
+
+        lists:foreach(
+            fun(#att{name = AttName} = Att) ->
+                etap:diag("Verifying attachment " ++ ?b2l(AttName)),
+
+                {ok, AttTarget} = find_att(TargetAtts, AttName),
+                SourceMd5 = att_md5(Att),
+                TargetMd5 = att_md5(AttTarget),
+                case AttName of
+                <<"att1">> ->
+                    etap:is(Att#att.encoding, gzip,
+                        "Attachment is gzip encoded in source database"),
+                    etap:is(AttTarget#att.encoding, gzip,
+                        "Attachment is gzip encoded in target database"),
+                    DecSourceMd5 = att_decoded_md5(Att),
+                    DecTargetMd5 = att_decoded_md5(AttTarget),
+                    etap:is(DecTargetMd5, DecSourceMd5,
+                        "Same identity content in source and target databases");
+                _ ->
+                    etap:is(Att#att.encoding, identity,
+                        "Attachment is not encoded in source database"),
+                    etap:is(AttTarget#att.encoding, identity,
+                        "Attachment is not encoded in target database")
+                end,
+                etap:is(TargetMd5, SourceMd5,
+                    "Same content in source and target databases"),
+                etap:is(is_integer(Att#att.disk_len), true,
+                    "#att.disk_len is an integer in source database"),
+                etap:is(is_integer(Att#att.att_len), true,
+                    "#att.att_len is an integer in source database"),
+                etap:is(is_integer(AttTarget#att.disk_len), true,
+                    "#att.disk_len is an integer in target database"),
+                etap:is(is_integer(AttTarget#att.att_len), true,
+                    "#att.att_len is an integer in target database"),
+                etap:is(Att#att.disk_len, AttTarget#att.disk_len,
+                    "Same identity length in source and target databases"),
+                etap:is(Att#att.att_len, AttTarget#att.att_len,
+                    "Same encoded length in source and target databases"),
+                etap:is(Att#att.type, AttTarget#att.type,
+                    "Same type in source and target databases"),
+                etap:is(Att#att.md5, SourceMd5, "Correct MD5 in source database"),
+                etap:is(AttTarget#att.md5, SourceMd5, "Correct MD5 in target database")
+            end,
+            SourceAtts),
+
+        {ok, Acc}
+    end,
+
+    {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+    ok = couch_db:close(SourceDb),
+    ok = couch_db:close(TargetDb).
+
+
+find_att([], _Name) ->
+    nil;
+find_att([#att{name = Name} = Att | _], Name) ->
+    {ok, Att};
+find_att([_ | Rest], Name) ->
+    find_att(Rest, Name).
+
+
+att_md5(Att) ->
+    Md50 = couch_doc:att_foldl(
+        Att,
+        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
+        couch_util:md5_init()),
+    couch_util:md5_final(Md50).
+
+att_decoded_md5(Att) ->
+    Md50 = couch_doc:att_foldl_decode(
+        Att,
+        fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
+        couch_util:md5_init()),
+    couch_util:md5_final(Md50).
+
+
+db_url(DbName) ->
+    iolist_to_binary([
+        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+        "/", DbName
+    ]).
+
+
+create_db(DbName) ->
+    couch_db:create(
+        DbName,
+        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
+
+
+delete_db(Db) ->
+    ok = couch_server:delete(
+        couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
+
+
+replicate({remote, Db}, Target) ->
+    replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+    replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+        RepObject, #user_ctx{roles = [<<"_admin">>]}),
+    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    MonRef = erlang:monitor(process, Pid),
+    receive
+    {'DOWN', MonRef, process, Pid, Reason} ->
+        etap:is(Reason, normal, "Replication finished successfully")
+    after 300000 ->
+        etap:bail("Timeout waiting for replication to finish")
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/05-replication-many-leaves.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/05-replication-many-leaves.t b/src/couch_replicator/test/05-replication-many-leaves.t
new file mode 100755
index 0000000..52d2023
--- /dev/null
+++ b/src/couch_replicator/test/05-replication-many-leaves.t
@@ -0,0 +1,216 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test replication of documents with many leaf revisions.
+% Motivated by COUCHDB-1340 and other similar issues where a document
+% GET with a too long ?open_revs revision list doesn't work due to
+% maximum web server limits for the HTTP request path.
+
+-record(user_ctx, {
+    name = null,
+    roles = [],
+    handler
+}).
+
+-record(doc, {
+    id = <<"">>,
+    revs = {0, []},
+    body = {[]},
+    atts = [],
+    deleted = false,
+    meta = []
+}).
+
+-define(b2l(B), binary_to_list(B)).
+-define(l2b(L), list_to_binary(L)).
+-define(i2l(I), integer_to_list(I)).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+doc_ids() ->
+    [<<"doc1">>, <<"doc2">>, <<"doc3">>].
+
+doc_num_conflicts(<<"doc1">>) -> 100;
+doc_num_conflicts(<<"doc2">>) -> 200;
+doc_num_conflicts(<<"doc3">>) -> 550.
+
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(16),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    couch_server_sup:start_link(test_util:config_files()),
+    ibrowse:start(),
+    crypto:start(),
+
+    Pairs = [
+        {source_db_name(), target_db_name()},
+        {{remote, source_db_name()}, target_db_name()},
+        {source_db_name(), {remote, target_db_name()}},
+        {{remote, source_db_name()}, {remote, (target_db_name())}}
+    ],
+
+    {ok, SourceDb} = create_db(source_db_name()),
+    etap:diag("Populating source database"),
+    {ok, DocRevs} = populate_db(SourceDb),
+    ok = couch_db:close(SourceDb),
+
+    lists:foreach(
+        fun({Source, Target}) ->
+            etap:diag("Creating target database"),
+            {ok, TargetDb} = create_db(target_db_name()),
+
+            ok = couch_db:close(TargetDb),
+            etap:diag("Triggering replication"),
+            replicate(Source, Target),
+            etap:diag("Replication finished, comparing source and target databases"),
+            {ok, TargetDb2} = couch_db:open_int(target_db_name(), []),
+            verify_target(TargetDb2, DocRevs),
+            ok = couch_db:close(TargetDb2),
+
+            etap:diag("Deleting target database"),
+            delete_db(TargetDb),
+            ok = timer:sleep(1000)
+        end,
+        Pairs),
+
+    delete_db(SourceDb),
+    couch_server_sup:stop(),
+    ok.
+
+
+populate_db(Db) ->
+    DocRevsDict = lists:foldl(
+        fun(DocId, Acc) ->
+            Value = <<"0">>,
+            Doc = #doc{
+                id = DocId,
+                body = {[ {<<"value">>, Value} ]}
+            },
+            {ok, Rev} = couch_db:update_doc(Db, Doc, []),
+            {ok, RevsDict} = add_doc_siblings(Db, DocId, doc_num_conflicts(DocId)),
+            RevsDict2 = dict:store(Rev, Value, RevsDict),
+            dict:store(DocId, RevsDict2, Acc)
+        end,
+        dict:new(), doc_ids()),
+    {ok, dict:to_list(DocRevsDict)}.
+
+
+add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 ->
+    add_doc_siblings(Db, DocId, NumLeaves, [], dict:new()).
+
+
+add_doc_siblings(Db, _DocId, 0, AccDocs, RevsDict) ->
+    {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes),
+    {ok, RevsDict};
+
+add_doc_siblings(Db, DocId, NumLeaves, AccDocs, RevsDict) ->
+    Value = list_to_binary(integer_to_list(NumLeaves)),
+    Rev = couch_util:md5(Value),
+    RevsDict2 = dict:store({1, Rev}, Value, RevsDict),
+    Doc = #doc{
+        id = DocId,
+        revs = {1, [Rev]},
+        body = {[ {<<"value">>, Value} ]}
+    },
+    add_doc_siblings(Db, DocId, NumLeaves - 1, [Doc | AccDocs], RevsDict2).
+
+
+verify_target(_TargetDb, []) ->
+    ok;
+
+verify_target(TargetDb, [{DocId, RevsDict} | Rest]) ->
+    {ok, Lookups} = couch_db:open_doc_revs(
+        TargetDb,
+        DocId,
+        [R || {R, _} <- dict:to_list(RevsDict)],
+        [ejson_body]),
+    Docs = [Doc || {ok, Doc} <- Lookups],
+    Total = doc_num_conflicts(DocId) + 1,
+    etap:is(
+        length(Docs),
+        Total,
+        "Target has " ++ ?i2l(Total) ++ " leaf revisions of document " ++ ?b2l(DocId)),
+    etap:diag("Verifying all revisions of document " ++ ?b2l(DocId)),
+    lists:foreach(
+        fun(#doc{revs = {Pos, [RevId]}, body = {Body}}) ->
+            Rev = {Pos, RevId},
+            {ok, Value} = dict:find(Rev, RevsDict),
+            case couch_util:get_value(<<"value">>, Body) of
+            Value ->
+                ok;
+            Other ->
+                etap:bail("Wrong value for revision " ++
+                    ?b2l(couch_doc:rev_to_str(Rev)) ++ " of document " ++
+                    ?b2l(DocId) ++ ". Expected `" ++ couch_util:to_list(Value) ++
+                    "`, got `" ++ couch_util:to_list(Other) ++ "`")
+            end
+        end,
+        Docs),
+    verify_target(TargetDb, Rest).
+
+
+db_url(DbName) ->
+    iolist_to_binary([
+        "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+        ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+        "/", DbName
+    ]).
+
+
+create_db(DbName) ->
+    couch_db:create(
+        DbName,
+        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
+
+
+delete_db(Db) ->
+    ok = couch_server:delete(
+        couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
+
+
+replicate({remote, Db}, Target) ->
+    replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+    replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target}
+    ]},
+    {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+        RepObject, #user_ctx{roles = [<<"_admin">>]}),
+    {ok, Pid} = couch_replicator:async_replicate(Rep),
+    MonRef = erlang:monitor(process, Pid),
+    receive
+    {'DOWN', MonRef, process, Pid, Reason} ->
+        etap:is(Reason, normal, "Replication finished successfully")
+    after 300000 ->
+        etap:bail("Timeout waiting for replication to finish")
+    end.