You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2011/12/05 11:31:10 UTC
[1/2] git commit: add load test for buildbot.
Updated Branches:
refs/heads/master f913ca6e8 -> 1c0c183df
add load test for buildbot.
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/604f1a60
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/604f1a60
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/604f1a60
Branch: refs/heads/master
Commit: 604f1a60724737aee3a942ebe9e808c53d566276
Parents: f913ca6
Author: benoitc <be...@apache.org>
Authored: Mon Dec 5 11:03:12 2011 +0100
Committer: benoitc <be...@apache.org>
Committed: Mon Dec 5 11:16:29 2011 +0100
----------------------------------------------------------------------
src/couch_replicator/Makefile.am | 9 +-
src/couch_replicator/test/001-httpc-pool.t | 250 --------
.../test/002-replication-compact.t | 486 ---------------
.../test/003-replication-large-atts.t | 267 --------
.../test/004-replication-many-leaves.t | 216 -------
src/couch_replicator/test/01-load.t | 37 ++
src/couch_replicator/test/02-httpc-pool.t | 250 ++++++++
src/couch_replicator/test/03-replication-compact.t | 486 +++++++++++++++
.../test/04-replication-large-atts.t | 267 ++++++++
.../test/05-replication-many-leaves.t | 216 +++++++
10 files changed, 1261 insertions(+), 1223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/Makefile.am
----------------------------------------------------------------------
diff --git a/src/couch_replicator/Makefile.am b/src/couch_replicator/Makefile.am
index 169a7e3..37b3e3b 100644
--- a/src/couch_replicator/Makefile.am
+++ b/src/couch_replicator/Makefile.am
@@ -36,10 +36,11 @@ source_files = \
src/couch_replicator.erl
test_files = \
- test/001-httpc-pool.t \
- test/002-replication-compact.t \
- test/003-replication-large-atts.t \
- test/004-replication-many-leaves.t
+ test/01-load.t \
+ test/02-httpc-pool.t \
+ test/03-replication-compact.t \
+ test/04-replication-large-atts.t \
+ test/05-replication-many-leaves.t
compiled_files = \
ebin/couch_replicator_api_wrap.beam \
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/001-httpc-pool.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/001-httpc-pool.t b/src/couch_replicator/test/001-httpc-pool.t
deleted file mode 100755
index a7bde6c..0000000
--- a/src/couch_replicator/test/001-httpc-pool.t
+++ /dev/null
@@ -1,250 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-main(_) ->
- test_util:init_code_path(),
-
- etap:plan(55),
- case (catch test()) of
- ok ->
- etap:end_tests();
- Other ->
- etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
- etap:bail(Other)
- end,
- ok.
-
-
-test() ->
- couch_server_sup:start_link(test_util:config_files()),
- ibrowse:start(),
-
- test_pool_full(),
- test_worker_dead_pool_non_full(),
- test_worker_dead_pool_full(),
-
- couch_server_sup:stop(),
- ok.
-
-
-test_pool_full() ->
- Pool = spawn_pool(),
- Client1 = spawn_client(Pool),
- Client2 = spawn_client(Pool),
- Client3 = spawn_client(Pool),
-
- etap:diag("Check that we can spawn the max number of connections."),
- etap:is(ping_client(Client1), ok, "Client 1 started ok."),
- etap:is(ping_client(Client2), ok, "Client 2 started ok."),
- etap:is(ping_client(Client3), ok, "Client 3 started ok."),
-
- Worker1 = get_client_worker(Client1, "1"),
- Worker2 = get_client_worker(Client2, "2"),
- Worker3 = get_client_worker(Client3, "3"),
- etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
- etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
- etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
-
- etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
- etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
- etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
-
- etap:diag("Check that client 4 blocks waiting for a worker."),
- Client4 = spawn_client(Pool),
- etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
-
- etap:diag("Check that stopping a client gives up its worker."),
- etap:is(stop_client(Client1), ok, "First client stopped."),
-
- etap:diag("And check that our blocked client has been unblocked."),
- etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
-
- Worker4 = get_client_worker(Client4, "4"),
- etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
- etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
-
- lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
- stop_pool(Pool).
-
-
-test_worker_dead_pool_non_full() ->
- Pool = spawn_pool(),
- Client1 = spawn_client(Pool),
-
- etap:is(ping_client(Client1), ok, "Client 1 started ok."),
- Worker1 = get_client_worker(Client1, "1"),
- etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
-
- etap:diag("Kill client's 1 worker."),
- etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
- etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
-
- etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
-
- Client2 = spawn_client(Pool),
- etap:is(ping_client(Client2), ok, "Client 2 started ok."),
- Worker2 = get_client_worker(Client2, "2"),
- etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
- etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
-
- etap:is(stop_client(Client2), ok, "Second client stopped."),
- stop_pool(Pool).
-
-
-test_worker_dead_pool_full() ->
- Pool = spawn_pool(),
- Client1 = spawn_client(Pool),
- Client2 = spawn_client(Pool),
- Client3 = spawn_client(Pool),
-
- etap:diag("Check that we can spawn the max number of connections."),
- etap:is(ping_client(Client1), ok, "Client 1 started ok."),
- etap:is(ping_client(Client2), ok, "Client 2 started ok."),
- etap:is(ping_client(Client3), ok, "Client 3 started ok."),
-
- Worker1 = get_client_worker(Client1, "1"),
- Worker2 = get_client_worker(Client2, "2"),
- Worker3 = get_client_worker(Client3, "3"),
- etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
- etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
- etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
-
- etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
- etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
- etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
-
- etap:diag("Check that client 4 blocks waiting for a worker."),
- Client4 = spawn_client(Pool),
- etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
-
- etap:diag("Kill client's 1 worker."),
- etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
- etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
-
- etap:diag("Check client 4 got unblocked after first worker's death"),
- etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
-
- Worker4 = get_client_worker(Client4, "4"),
- etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
- etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
- etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
- etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
-
- etap:diag("Check that stopping client 1 is a noop."),
- etap:is(stop_client(Client1), ok, "First client stopped."),
-
- etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
- etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
- etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
-
- etap:diag("Check that client 5 blocks waiting for a worker."),
- Client5 = spawn_client(Pool),
- etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
-
- etap:diag("Check that stopping client 2 gives up its worker."),
- etap:is(stop_client(Client2), ok, "Second client stopped."),
-
- etap:diag("Now check that client 5 has been unblocked."),
- etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
-
- Worker5 = get_client_worker(Client5, "5"),
- etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
- etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
- etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
- etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
- etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
-
- etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
- etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
- etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
-
- lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
- stop_pool(Pool).
-
-
-spawn_client(Pool) ->
- Parent = self(),
- Ref = make_ref(),
- Pid = spawn(fun() ->
- {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool),
- loop(Parent, Ref, Worker, Pool)
- end),
- {Pid, Ref}.
-
-
-ping_client({Pid, Ref}) ->
- Pid ! ping,
- receive
- {pong, Ref} ->
- ok
- after 3000 ->
- timeout
- end.
-
-
-get_client_worker({Pid, Ref}, ClientName) ->
- Pid ! get_worker,
- receive
- {worker, Ref, Worker} ->
- Worker
- after 3000 ->
- etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
- end.
-
-
-stop_client({Pid, Ref}) ->
- Pid ! stop,
- receive
- {stop, Ref} ->
- ok
- after 3000 ->
- timeout
- end.
-
-
-kill_client_worker({Pid, Ref}) ->
- Pid ! get_worker,
- receive
- {worker, Ref, Worker} ->
- exit(Worker, kill),
- ok
- after 3000 ->
- timeout
- end.
-
-
-loop(Parent, Ref, Worker, Pool) ->
- receive
- ping ->
- Parent ! {pong, Ref},
- loop(Parent, Ref, Worker, Pool);
- get_worker ->
- Parent ! {worker, Ref, Worker},
- loop(Parent, Ref, Worker, Pool);
- stop ->
- couch_replicator_httpc_pool:release_worker(Pool, Worker),
- Parent ! {stop, Ref}
- end.
-
-
-spawn_pool() ->
- Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
- Port = couch_config:get("httpd", "port", "5984"),
- {ok, Pool} = couch_replicator_httpc_pool:start_link(
- "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
- Pool.
-
-
-stop_pool(Pool) ->
- ok = couch_replicator_httpc_pool:stop(Pool).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/002-replication-compact.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/002-replication-compact.t b/src/couch_replicator/test/002-replication-compact.t
deleted file mode 100755
index c8b265e..0000000
--- a/src/couch_replicator/test/002-replication-compact.t
+++ /dev/null
@@ -1,486 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Verify that compacting databases that are being used as the source or
-% target of a replication doesn't affect the replication and that the
-% replication doesn't hold their reference counters forever.
-
--define(b2l(B), binary_to_list(B)).
-
--record(user_ctx, {
- name = null,
- roles = [],
- handler
-}).
-
--record(db, {
- main_pid = nil,
- update_pid = nil,
- compactor_pid = nil,
- instance_start_time, % number of microsecs since jan 1 1970 as a binary string
- fd,
- updater_fd,
- fd_ref_counter,
- header = nil,
- committed_update_seq,
- fulldocinfo_by_id_btree,
- docinfo_by_seq_btree,
- local_docs_btree,
- update_seq,
- name,
- filepath,
- validate_doc_funs = [],
- security = [],
- security_ptr = nil,
- user_ctx = #user_ctx{},
- waiting_delayed_commit = nil,
- revs_limit = 1000,
- fsync_options = [],
- options = [],
- compression
-}).
-
--record(rep, {
- id,
- source,
- target,
- options,
- user_ctx,
- doc_id
-}).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-
-main(_) ->
- test_util:init_code_path(),
-
- etap:plan(376),
- case (catch test()) of
- ok ->
- etap:end_tests();
- Other ->
- etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
- etap:bail(Other)
- end,
- ok.
-
-
-test() ->
- couch_server_sup:start_link(test_util:config_files()),
- ibrowse:start(),
-
- Pairs = [
- {source_db_name(), target_db_name()},
- {{remote, source_db_name()}, target_db_name()},
- {source_db_name(), {remote, target_db_name()}},
- {{remote, source_db_name()}, {remote, (target_db_name())}}
- ],
-
- lists:foreach(
- fun({Source, Target}) ->
- {ok, SourceDb} = create_db(source_db_name()),
- etap:is(couch_db:is_idle(SourceDb), true,
- "Source database is idle before starting replication"),
-
- {ok, TargetDb} = create_db(target_db_name()),
- etap:is(couch_db:is_idle(TargetDb), true,
- "Target database is idle before starting replication"),
-
- {ok, RepPid, RepId} = replicate(Source, Target),
- check_active_tasks(RepPid, RepId, Source, Target),
- {ok, DocsWritten} = populate_and_compact_test(
- RepPid, SourceDb, TargetDb),
-
- wait_target_in_sync(DocsWritten, TargetDb),
- check_active_tasks(RepPid, RepId, Source, Target),
- cancel_replication(RepId, RepPid),
- compare_dbs(SourceDb, TargetDb),
-
- delete_db(SourceDb),
- delete_db(TargetDb),
- couch_server_sup:stop(),
- ok = timer:sleep(1000),
- couch_server_sup:start_link(test_util:config_files())
- end,
- Pairs),
-
- couch_server_sup:stop(),
- ok.
-
-
-populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
- etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
- check_db_alive("source", SourceDb0),
- check_db_alive("target", TargetDb0),
-
- Writer = spawn_writer(SourceDb0),
-
- lists:foldl(
- fun(_, {SourceDb, TargetDb, DocCount}) ->
- pause_writer(Writer),
-
- compact_db("source", SourceDb),
- etap:is(is_process_alive(RepPid), true,
- "Replication process is alive after source database compaction"),
- check_db_alive("source", SourceDb),
- check_ref_counter("source", SourceDb),
-
- compact_db("target", TargetDb),
- etap:is(is_process_alive(RepPid), true,
- "Replication process is alive after target database compaction"),
- check_db_alive("target", TargetDb),
- check_ref_counter("target", TargetDb),
-
- {ok, SourceDb2} = reopen_db(SourceDb),
- {ok, TargetDb2} = reopen_db(TargetDb),
-
- resume_writer(Writer),
- wait_writer(Writer, DocCount),
-
- compact_db("source", SourceDb2),
- etap:is(is_process_alive(RepPid), true,
- "Replication process is alive after source database compaction"),
- check_db_alive("source", SourceDb2),
- pause_writer(Writer),
- check_ref_counter("source", SourceDb2),
- resume_writer(Writer),
-
- compact_db("target", TargetDb2),
- etap:is(is_process_alive(RepPid), true,
- "Replication process is alive after target database compaction"),
- check_db_alive("target", TargetDb2),
- pause_writer(Writer),
- check_ref_counter("target", TargetDb2),
- resume_writer(Writer),
-
- {ok, SourceDb3} = reopen_db(SourceDb2),
- {ok, TargetDb3} = reopen_db(TargetDb2),
- {SourceDb3, TargetDb3, DocCount + 50}
- end,
- {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
-
- DocsWritten = stop_writer(Writer),
- {ok, DocsWritten}.
-
-
-check_db_alive(Type, #db{main_pid = Pid}) ->
- etap:is(is_process_alive(Pid), true,
- "Local " ++ Type ++ " database main pid is alive").
-
-
-compact_db(Type, #db{name = Name}) ->
- {ok, Db} = couch_db:open_int(Name, []),
- {ok, CompactPid} = couch_db:start_compact(Db),
- MonRef = erlang:monitor(process, CompactPid),
- receive
- {'DOWN', MonRef, process, CompactPid, normal} ->
- ok;
- {'DOWN', MonRef, process, CompactPid, Reason} ->
- etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
- ": " ++ couch_util:to_list(Reason))
- after 30000 ->
- etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
- " didn't finish")
- end,
- ok = couch_db:close(Db).
-
-
-check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
- MonRef = erlang:monitor(process, OldRefCounter),
- receive
- {'DOWN', MonRef, process, OldRefCounter, _} ->
- etap:diag("Old " ++ Type ++ " database ref counter terminated")
- after 30000 ->
- etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
- end,
- {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
- ok = couch_db:close(Db),
- etap:isnt(
- NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
-
-
-reopen_db(#db{name = Name}) ->
- {ok, Db} = couch_db:open_int(Name, []),
- ok = couch_db:close(Db),
- {ok, Db}.
-
-
-wait_target_in_sync(DocCount, #db{name = TargetName}) ->
- wait_target_in_sync_loop(DocCount, TargetName, 300).
-
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
- etap:bail("Could not get source and target databases in sync");
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
- {ok, Target} = couch_db:open_int(TargetName, []),
- {ok, TargetInfo} = couch_db:get_db_info(Target),
- ok = couch_db:close(Target),
- TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
- true ->
- etap:diag("Source and target databases are in sync");
- false ->
- ok = timer:sleep(100),
- wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
- end.
-
-
-compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
- {ok, SourceDb} = couch_db:open_int(SourceName, []),
- {ok, TargetDb} = couch_db:open_int(TargetName, []),
- Fun = fun(FullDocInfo, _, Acc) ->
- {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
- {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- DocTarget = case couch_db:open_doc(TargetDb, DocId) of
- {ok, DocT} ->
- DocT;
- Error ->
- etap:bail("Error opening document '" ++ ?b2l(DocId) ++
- "' from target: " ++ couch_util:to_list(Error))
- end,
- DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
- case DocTargetJson of
- DocJson ->
- ok;
- _ ->
- etap:bail("Content from document '" ++ ?b2l(DocId) ++
- "' differs in target database")
- end,
- {ok, Acc}
- end,
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
- etap:diag("Target database has the same documents as the source database"),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb).
-
-
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
- Source = case Src of
- {remote, NameSrc} ->
- <<(db_url(NameSrc))/binary, $/>>;
- _ ->
- Src
- end,
- Target = case Tgt of
- {remote, NameTgt} ->
- <<(db_url(NameTgt))/binary, $/>>;
- _ ->
- Tgt
- end,
- FullRepId = list_to_binary(BaseId ++ Ext),
- Pid = list_to_binary(pid_to_list(RepPid)),
- [RepTask] = couch_task_status:all(),
- etap:is(couch_util:get_value(pid, RepTask), Pid,
- "_active_tasks entry has correct pid property"),
- etap:is(couch_util:get_value(replication_id, RepTask), FullRepId,
- "_active_tasks entry has right replication id"),
- etap:is(couch_util:get_value(continuous, RepTask), true,
- "_active_tasks entry has continuous property set to true"),
- etap:is(couch_util:get_value(source, RepTask), Source,
- "_active_tasks entry has correct source property"),
- etap:is(couch_util:get_value(target, RepTask), Target,
- "_active_tasks entry has correct target property"),
- etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true,
- "_active_tasks entry has integer docs_read property"),
- etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true,
- "_active_tasks entry has integer docs_written property"),
- etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true,
- "_active_tasks entry has integer doc_write_failures property"),
- etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true,
- "_active_tasks entry has integer revisions_checked property"),
- etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true,
- "_active_tasks entry has integer missing_revisions_found property"),
- etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true,
- "_active_tasks entry has integer checkpointed_source_seq property"),
- etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true,
- "_active_tasks entry has integer source_seq property"),
- Progress = couch_util:get_value(progress, RepTask),
- etap:is(is_integer(Progress), true,
- "_active_tasks entry has an integer progress property"),
- etap:is(Progress =< 100, true, "Progress is not greater than 100%").
-
-
-wait_writer(Pid, NumDocs) ->
- case get_writer_num_docs_written(Pid) of
- N when N >= NumDocs ->
- ok;
- _ ->
- wait_writer(Pid, NumDocs)
- end.
-
-
-spawn_writer(Db) ->
- Parent = self(),
- Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
- etap:diag("Started source database writer"),
- Pid.
-
-
-pause_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {pause, Ref},
- receive
- {paused, Ref} ->
- ok
- after 30000 ->
- etap:bail("Failed to pause source database writer")
- end.
-
-
-resume_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {continue, Ref},
- receive
- {ok, Ref} ->
- ok
- after 30000 ->
- etap:bail("Failed to unpause source database writer")
- end.
-
-
-get_writer_num_docs_written(Pid) ->
- Ref = make_ref(),
- Pid ! {get_count, Ref},
- receive
- {count, Ref, Count} ->
- Count
- after 30000 ->
- etap:bail("Timeout getting number of documents written from "
- "source database writer")
- end.
-
-
-stop_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {stop, Ref},
- receive
- {stopped, Ref, DocsWritten} ->
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _Reason} ->
- etap:diag("Stopped source database writer"),
- DocsWritten
- after 30000 ->
- etap:bail("Timeout stopping source database writer")
- end
- after 30000 ->
- etap:bail("Timeout stopping source database writer")
- end.
-
-
-writer_loop(#db{name = DbName}, Parent, Counter) ->
- maybe_pause(Parent, Counter),
- Doc = couch_doc:from_json_obj({[
- {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
- {<<"value">>, Counter + 1},
- {<<"_attachments">>, {[
- {<<"icon1.png">>, {[
- {<<"data">>, base64:encode(att_data())},
- {<<"content_type">>, <<"image/png">>}
- ]}},
- {<<"icon2.png">>, {[
- {<<"data">>, base64:encode(iolist_to_binary(
- [att_data(), att_data()]))},
- {<<"content_type">>, <<"image/png">>}
- ]}}
- ]}}
- ]}),
- maybe_pause(Parent, Counter),
- {ok, Db} = couch_db:open_int(DbName, []),
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- ok = couch_db:close(Db),
- receive
- {get_count, Ref} ->
- Parent ! {count, Ref, Counter + 1},
- writer_loop(Db, Parent, Counter + 1);
- {stop, Ref} ->
- Parent ! {stopped, Ref, Counter + 1}
- after 0 ->
- ok = timer:sleep(500),
- writer_loop(Db, Parent, Counter + 1)
- end.
-
-
-maybe_pause(Parent, Counter) ->
- receive
- {get_count, Ref} ->
- Parent ! {count, Ref, Counter};
- {pause, Ref} ->
- Parent ! {paused, Ref},
- receive {continue, Ref2} -> Parent ! {ok, Ref2} end
- after 0 ->
- ok
- end.
-
-
-db_url(DbName) ->
- iolist_to_binary([
- "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
- ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
- "/", DbName
- ]).
-
-
-create_db(DbName) ->
- {ok, Db} = couch_db:create(
- DbName,
- [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
- couch_db:close(Db),
- {ok, Db}.
-
-
-delete_db(#db{name = DbName, main_pid = Pid}) ->
- ok = couch_server:delete(
- DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _Reason} ->
- ok
- after 30000 ->
- etap:bail("Timeout deleting database")
- end.
-
-
-replicate({remote, Db}, Target) ->
- replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
- replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target},
- {<<"continuous">>, true}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(
- RepObject, #user_ctx{roles = [<<"_admin">>]}),
- {ok, Pid} = couch_replicator:async_replicate(Rep),
- {ok, Pid, Rep#rep.id}.
-
-
-cancel_replication(RepId, RepPid) ->
- {ok, _} = couch_replicator:cancel_replication(RepId),
- etap:is(is_process_alive(RepPid), false,
- "Replication process is no longer alive after cancel").
-
-
-att_data() ->
- {ok, Data} = file:read_file(
- test_util:source_file("share/www/image/logo.png")),
- Data.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/003-replication-large-atts.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/003-replication-large-atts.t b/src/couch_replicator/test/003-replication-large-atts.t
deleted file mode 100755
index 5386179..0000000
--- a/src/couch_replicator/test/003-replication-large-atts.t
+++ /dev/null
@@ -1,267 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Test replication of large attachments. Verify that both source and
-% target have the same attachment data and metadata.
-
--define(b2l(Bin), binary_to_list(Bin)).
-
--record(user_ctx, {
- name = null,
- roles = [],
- handler
-}).
-
--record(doc, {
- id = <<"">>,
- revs = {0, []},
- body = {[]},
- atts = [],
- deleted = false,
- meta = []
-}).
-
--record(att, {
- name,
- type,
- att_len,
- disk_len,
- md5= <<>>,
- revpos=0,
- data,
- encoding=identity
-}).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-
-main(_) ->
- test_util:init_code_path(),
-
- etap:plan(1192),
- case (catch test()) of
- ok ->
- etap:end_tests();
- Other ->
- etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
- etap:bail(Other)
- end,
- ok.
-
-
-test() ->
- couch_server_sup:start_link(test_util:config_files()),
- ibrowse:start(),
- crypto:start(),
- couch_config:set("attachments", "compressible_types", "text/*", false),
-
- Pairs = [
- {source_db_name(), target_db_name()},
- {{remote, source_db_name()}, target_db_name()},
- {source_db_name(), {remote, target_db_name()}},
- {{remote, source_db_name()}, {remote, (target_db_name())}}
- ],
-
- {ok, SourceDb} = create_db(source_db_name()),
- etap:diag("Populating source database"),
- populate_db(SourceDb, 11),
- ok = couch_db:close(SourceDb),
-
- lists:foreach(
- fun({Source, Target}) ->
- etap:diag("Creating target database"),
- {ok, TargetDb} = create_db(target_db_name()),
-
- ok = couch_db:close(TargetDb),
- etap:diag("Triggering replication"),
- replicate(Source, Target),
- etap:diag("Replication finished, comparing source and target databases"),
- compare_dbs(SourceDb, TargetDb),
-
- etap:diag("Deleting target database"),
- delete_db(TargetDb),
- ok = timer:sleep(1000)
- end,
- Pairs),
-
- delete_db(SourceDb),
- couch_server_sup:stop(),
- ok.
-
-
-populate_db(Db, DocCount) ->
- Docs = lists:foldl(
- fun(DocIdCounter, Acc) ->
- Doc = #doc{
- id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
- body = {[]},
- atts = [
- att(<<"att1">>, 2 * 1024 * 1024, <<"text/plain">>),
- att(<<"att2">>, round(6.6 * 1024 * 1024), <<"app/binary">>)
- ]
- },
- [Doc | Acc]
- end,
- [], lists:seq(1, DocCount)),
- {ok, _} = couch_db:update_docs(Db, Docs, []).
-
-
-att(Name, Size, Type) ->
- #att{
- name = Name,
- type = Type,
- att_len = Size,
- data = fun(Count) -> crypto:rand_bytes(Count) end
- }.
-
-
-compare_dbs(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(couch_db:name(Source), []),
- {ok, TargetDb} = couch_db:open_int(couch_db:name(Target), []),
-
- Fun = fun(FullDocInfo, _, Acc) ->
- {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
- Id = DocSource#doc.id,
-
- etap:diag("Verifying document " ++ ?b2l(Id)),
-
- {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
- etap:is(DocTarget#doc.body, DocSource#doc.body,
- "Same body in source and target databases"),
-
- #doc{atts = SourceAtts} = DocSource,
- #doc{atts = TargetAtts} = DocTarget,
- etap:is(
- lists:sort([N || #att{name = N} <- SourceAtts]),
- lists:sort([N || #att{name = N} <- TargetAtts]),
- "Document has same number (and names) of attachments in "
- "source and target databases"),
-
- lists:foreach(
- fun(#att{name = AttName} = Att) ->
- etap:diag("Verifying attachment " ++ ?b2l(AttName)),
-
- {ok, AttTarget} = find_att(TargetAtts, AttName),
- SourceMd5 = att_md5(Att),
- TargetMd5 = att_md5(AttTarget),
- case AttName of
- <<"att1">> ->
- etap:is(Att#att.encoding, gzip,
- "Attachment is gzip encoded in source database"),
- etap:is(AttTarget#att.encoding, gzip,
- "Attachment is gzip encoded in target database"),
- DecSourceMd5 = att_decoded_md5(Att),
- DecTargetMd5 = att_decoded_md5(AttTarget),
- etap:is(DecTargetMd5, DecSourceMd5,
- "Same identity content in source and target databases");
- _ ->
- etap:is(Att#att.encoding, identity,
- "Attachment is not encoded in source database"),
- etap:is(AttTarget#att.encoding, identity,
- "Attachment is not encoded in target database")
- end,
- etap:is(TargetMd5, SourceMd5,
- "Same content in source and target databases"),
- etap:is(is_integer(Att#att.disk_len), true,
- "#att.disk_len is an integer in source database"),
- etap:is(is_integer(Att#att.att_len), true,
- "#att.att_len is an integer in source database"),
- etap:is(is_integer(AttTarget#att.disk_len), true,
- "#att.disk_len is an integer in target database"),
- etap:is(is_integer(AttTarget#att.att_len), true,
- "#att.att_len is an integer in target database"),
- etap:is(Att#att.disk_len, AttTarget#att.disk_len,
- "Same identity length in source and target databases"),
- etap:is(Att#att.att_len, AttTarget#att.att_len,
- "Same encoded length in source and target databases"),
- etap:is(Att#att.type, AttTarget#att.type,
- "Same type in source and target databases"),
- etap:is(Att#att.md5, SourceMd5, "Correct MD5 in source database"),
- etap:is(AttTarget#att.md5, SourceMd5, "Correct MD5 in target database")
- end,
- SourceAtts),
-
- {ok, Acc}
- end,
-
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb).
-
-
-find_att([], _Name) ->
- nil;
-find_att([#att{name = Name} = Att | _], Name) ->
- {ok, Att};
-find_att([_ | Rest], Name) ->
- find_att(Rest, Name).
-
-
-att_md5(Att) ->
- Md50 = couch_doc:att_foldl(
- Att,
- fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
- couch_util:md5_init()),
- couch_util:md5_final(Md50).
-
-att_decoded_md5(Att) ->
- Md50 = couch_doc:att_foldl_decode(
- Att,
- fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
- couch_util:md5_init()),
- couch_util:md5_final(Md50).
-
-
-db_url(DbName) ->
- iolist_to_binary([
- "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
- ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
- "/", DbName
- ]).
-
-
-create_db(DbName) ->
- couch_db:create(
- DbName,
- [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
-
-
-delete_db(Db) ->
- ok = couch_server:delete(
- couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
-
-
-replicate({remote, Db}, Target) ->
- replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
- replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(
- RepObject, #user_ctx{roles = [<<"_admin">>]}),
- {ok, Pid} = couch_replicator:async_replicate(Rep),
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, Reason} ->
- etap:is(Reason, normal, "Replication finished successfully")
- after 300000 ->
- etap:bail("Timeout waiting for replication to finish")
- end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/004-replication-many-leaves.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/004-replication-many-leaves.t b/src/couch_replicator/test/004-replication-many-leaves.t
deleted file mode 100755
index 52d2023..0000000
--- a/src/couch_replicator/test/004-replication-many-leaves.t
+++ /dev/null
@@ -1,216 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Test replication of documents with many leaf revisions.
-% Motivated by COUCHDB-1340 and other similar issues where a document
-% GET with a too long ?open_revs revision list doesn't work due to
-% maximum web server limits for the HTTP request path.
-
--record(user_ctx, {
- name = null,
- roles = [],
- handler
-}).
-
--record(doc, {
- id = <<"">>,
- revs = {0, []},
- body = {[]},
- atts = [],
- deleted = false,
- meta = []
-}).
-
--define(b2l(B), binary_to_list(B)).
--define(l2b(L), list_to_binary(L)).
--define(i2l(I), integer_to_list(I)).
-
-
-source_db_name() -> <<"couch_test_rep_db_a">>.
-target_db_name() -> <<"couch_test_rep_db_b">>.
-
-doc_ids() ->
- [<<"doc1">>, <<"doc2">>, <<"doc3">>].
-
-doc_num_conflicts(<<"doc1">>) -> 100;
-doc_num_conflicts(<<"doc2">>) -> 200;
-doc_num_conflicts(<<"doc3">>) -> 550.
-
-
-main(_) ->
- test_util:init_code_path(),
-
- etap:plan(16),
- case (catch test()) of
- ok ->
- etap:end_tests();
- Other ->
- etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
- etap:bail(Other)
- end,
- ok.
-
-
-test() ->
- couch_server_sup:start_link(test_util:config_files()),
- ibrowse:start(),
- crypto:start(),
-
- Pairs = [
- {source_db_name(), target_db_name()},
- {{remote, source_db_name()}, target_db_name()},
- {source_db_name(), {remote, target_db_name()}},
- {{remote, source_db_name()}, {remote, (target_db_name())}}
- ],
-
- {ok, SourceDb} = create_db(source_db_name()),
- etap:diag("Populating source database"),
- {ok, DocRevs} = populate_db(SourceDb),
- ok = couch_db:close(SourceDb),
-
- lists:foreach(
- fun({Source, Target}) ->
- etap:diag("Creating target database"),
- {ok, TargetDb} = create_db(target_db_name()),
-
- ok = couch_db:close(TargetDb),
- etap:diag("Triggering replication"),
- replicate(Source, Target),
- etap:diag("Replication finished, comparing source and target databases"),
- {ok, TargetDb2} = couch_db:open_int(target_db_name(), []),
- verify_target(TargetDb2, DocRevs),
- ok = couch_db:close(TargetDb2),
-
- etap:diag("Deleting target database"),
- delete_db(TargetDb),
- ok = timer:sleep(1000)
- end,
- Pairs),
-
- delete_db(SourceDb),
- couch_server_sup:stop(),
- ok.
-
-
-populate_db(Db) ->
- DocRevsDict = lists:foldl(
- fun(DocId, Acc) ->
- Value = <<"0">>,
- Doc = #doc{
- id = DocId,
- body = {[ {<<"value">>, Value} ]}
- },
- {ok, Rev} = couch_db:update_doc(Db, Doc, []),
- {ok, RevsDict} = add_doc_siblings(Db, DocId, doc_num_conflicts(DocId)),
- RevsDict2 = dict:store(Rev, Value, RevsDict),
- dict:store(DocId, RevsDict2, Acc)
- end,
- dict:new(), doc_ids()),
- {ok, dict:to_list(DocRevsDict)}.
-
-
-add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 ->
- add_doc_siblings(Db, DocId, NumLeaves, [], dict:new()).
-
-
-add_doc_siblings(Db, _DocId, 0, AccDocs, RevsDict) ->
- {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes),
- {ok, RevsDict};
-
-add_doc_siblings(Db, DocId, NumLeaves, AccDocs, RevsDict) ->
- Value = list_to_binary(integer_to_list(NumLeaves)),
- Rev = couch_util:md5(Value),
- RevsDict2 = dict:store({1, Rev}, Value, RevsDict),
- Doc = #doc{
- id = DocId,
- revs = {1, [Rev]},
- body = {[ {<<"value">>, Value} ]}
- },
- add_doc_siblings(Db, DocId, NumLeaves - 1, [Doc | AccDocs], RevsDict2).
-
-
-verify_target(_TargetDb, []) ->
- ok;
-
-verify_target(TargetDb, [{DocId, RevsDict} | Rest]) ->
- {ok, Lookups} = couch_db:open_doc_revs(
- TargetDb,
- DocId,
- [R || {R, _} <- dict:to_list(RevsDict)],
- [ejson_body]),
- Docs = [Doc || {ok, Doc} <- Lookups],
- Total = doc_num_conflicts(DocId) + 1,
- etap:is(
- length(Docs),
- Total,
- "Target has " ++ ?i2l(Total) ++ " leaf revisions of document " ++ ?b2l(DocId)),
- etap:diag("Verifying all revisions of document " ++ ?b2l(DocId)),
- lists:foreach(
- fun(#doc{revs = {Pos, [RevId]}, body = {Body}}) ->
- Rev = {Pos, RevId},
- {ok, Value} = dict:find(Rev, RevsDict),
- case couch_util:get_value(<<"value">>, Body) of
- Value ->
- ok;
- Other ->
- etap:bail("Wrong value for revision " ++
- ?b2l(couch_doc:rev_to_str(Rev)) ++ " of document " ++
- ?b2l(DocId) ++ ". Expected `" ++ couch_util:to_list(Value) ++
- "`, got `" ++ couch_util:to_list(Other) ++ "`")
- end
- end,
- Docs),
- verify_target(TargetDb, Rest).
-
-
-db_url(DbName) ->
- iolist_to_binary([
- "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
- ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
- "/", DbName
- ]).
-
-
-create_db(DbName) ->
- couch_db:create(
- DbName,
- [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
-
-
-delete_db(Db) ->
- ok = couch_server:delete(
- couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
-
-
-replicate({remote, Db}, Target) ->
- replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
- replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(
- RepObject, #user_ctx{roles = [<<"_admin">>]}),
- {ok, Pid} = couch_replicator:async_replicate(Rep),
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, Reason} ->
- etap:is(Reason, normal, "Replication finished successfully")
- after 300000 ->
- etap:bail("Timeout waiting for replication to finish")
- end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/01-load.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/01-load.t b/src/couch_replicator/test/01-load.t
new file mode 100644
index 0000000..07561a7
--- /dev/null
+++ b/src/couch_replicator/test/01-load.t
@@ -0,0 +1,37 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test that we can load each module.
+
+main(_) ->
+ test_util:init_code_path(),
+ Modules = [
+ couch_replicator_api_wrap,
+ couch_replicator_httpc,
+ couch_replicator_httpd,
+ couch_replicator_manager,
+ couch_replicator_notifier,
+ couch_replicator,
+ couch_replicator_worker,
+ couch_replicator_utils,
+ couch_replicator_job_sup
+ ],
+
+ etap:plan(length(Modules)),
+ lists:foreach(
+ fun(Module) ->
+ etap_can:loaded_ok(Module, lists:concat(["Loaded: ", Module]))
+ end, Modules),
+ etap:end_tests().
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/02-httpc-pool.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/02-httpc-pool.t b/src/couch_replicator/test/02-httpc-pool.t
new file mode 100755
index 0000000..a7bde6c
--- /dev/null
+++ b/src/couch_replicator/test/02-httpc-pool.t
@@ -0,0 +1,250 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+ test_util:init_code_path(),
+
+ etap:plan(55),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+
+test() ->
+ couch_server_sup:start_link(test_util:config_files()),
+ ibrowse:start(),
+
+ test_pool_full(),
+ test_worker_dead_pool_non_full(),
+ test_worker_dead_pool_full(),
+
+ couch_server_sup:stop(),
+ ok.
+
+
+test_pool_full() ->
+ Pool = spawn_pool(),
+ Client1 = spawn_client(Pool),
+ Client2 = spawn_client(Pool),
+ Client3 = spawn_client(Pool),
+
+ etap:diag("Check that we can spawn the max number of connections."),
+ etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+ etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+ etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+ Worker1 = get_client_worker(Client1, "1"),
+ Worker2 = get_client_worker(Client2, "2"),
+ Worker3 = get_client_worker(Client3, "3"),
+ etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+ etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+ etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+ etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+ etap:diag("Check that client 4 blocks waiting for a worker."),
+ Client4 = spawn_client(Pool),
+ etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+ etap:diag("Check that stopping a client gives up its worker."),
+ etap:is(stop_client(Client1), ok, "First client stopped."),
+
+ etap:diag("And check that our blocked client has been unblocked."),
+ etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
+
+ Worker4 = get_client_worker(Client4, "4"),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+ etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
+
+ lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
+ stop_pool(Pool).
+
+
+test_worker_dead_pool_non_full() ->
+ Pool = spawn_pool(),
+ Client1 = spawn_client(Pool),
+
+ etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+ Worker1 = get_client_worker(Client1, "1"),
+ etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+
+ etap:diag("Kill client's 1 worker."),
+ etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+ etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+ etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
+
+ Client2 = spawn_client(Pool),
+ etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+ Worker2 = get_client_worker(Client2, "2"),
+ etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+
+ etap:is(stop_client(Client2), ok, "Second client stopped."),
+ stop_pool(Pool).
+
+
+test_worker_dead_pool_full() ->
+ Pool = spawn_pool(),
+ Client1 = spawn_client(Pool),
+ Client2 = spawn_client(Pool),
+ Client3 = spawn_client(Pool),
+
+ etap:diag("Check that we can spawn the max number of connections."),
+ etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+ etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+ etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+ Worker1 = get_client_worker(Client1, "1"),
+ Worker2 = get_client_worker(Client2, "2"),
+ Worker3 = get_client_worker(Client3, "3"),
+ etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+ etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+ etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+ etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+ etap:diag("Check that client 4 blocks waiting for a worker."),
+ Client4 = spawn_client(Pool),
+ etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+ etap:diag("Kill client's 1 worker."),
+ etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+ etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+ etap:diag("Check client 4 got unblocked after first worker's death"),
+ etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
+
+ Worker4 = get_client_worker(Client4, "4"),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+ etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
+ etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
+ etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
+
+ etap:diag("Check that stopping client 1 is a noop."),
+ etap:is(stop_client(Client1), ok, "First client stopped."),
+
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+
+ etap:diag("Check that client 5 blocks waiting for a worker."),
+ Client5 = spawn_client(Pool),
+ etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
+
+ etap:diag("Check that stopping client 2 gives up its worker."),
+ etap:is(stop_client(Client2), ok, "Second client stopped."),
+
+ etap:diag("Now check that client 5 has been unblocked."),
+ etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
+
+ Worker5 = get_client_worker(Client5, "5"),
+ etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
+ etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
+ etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
+ etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
+ etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
+
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+ etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
+
+ lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
+ stop_pool(Pool).
+
+
+spawn_client(Pool) ->
+ Parent = self(),
+ Ref = make_ref(),
+ Pid = spawn(fun() ->
+ {ok, Worker} = couch_replicator_httpc_pool:get_worker(Pool),
+ loop(Parent, Ref, Worker, Pool)
+ end),
+ {Pid, Ref}.
+
+
+ping_client({Pid, Ref}) ->
+ Pid ! ping,
+ receive
+ {pong, Ref} ->
+ ok
+ after 3000 ->
+ timeout
+ end.
+
+
+get_client_worker({Pid, Ref}, ClientName) ->
+ Pid ! get_worker,
+ receive
+ {worker, Ref, Worker} ->
+ Worker
+ after 3000 ->
+ etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
+ end.
+
+
+stop_client({Pid, Ref}) ->
+ Pid ! stop,
+ receive
+ {stop, Ref} ->
+ ok
+ after 3000 ->
+ timeout
+ end.
+
+
+kill_client_worker({Pid, Ref}) ->
+ Pid ! get_worker,
+ receive
+ {worker, Ref, Worker} ->
+ exit(Worker, kill),
+ ok
+ after 3000 ->
+ timeout
+ end.
+
+
+loop(Parent, Ref, Worker, Pool) ->
+ receive
+ ping ->
+ Parent ! {pong, Ref},
+ loop(Parent, Ref, Worker, Pool);
+ get_worker ->
+ Parent ! {worker, Ref, Worker},
+ loop(Parent, Ref, Worker, Pool);
+ stop ->
+ couch_replicator_httpc_pool:release_worker(Pool, Worker),
+ Parent ! {stop, Ref}
+ end.
+
+
+spawn_pool() ->
+ Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
+ Port = couch_config:get("httpd", "port", "5984"),
+ {ok, Pool} = couch_replicator_httpc_pool:start_link(
+ "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
+ Pool.
+
+
+stop_pool(Pool) ->
+ ok = couch_replicator_httpc_pool:stop(Pool).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/03-replication-compact.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/03-replication-compact.t b/src/couch_replicator/test/03-replication-compact.t
new file mode 100755
index 0000000..c8b265e
--- /dev/null
+++ b/src/couch_replicator/test/03-replication-compact.t
@@ -0,0 +1,486 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Verify that compacting databases that are being used as the source or
+% target of a replication doesn't affect the replication and that the
+% replication doesn't hold their reference counters forever.
+
+-define(b2l(B), binary_to_list(B)).
+
+-record(user_ctx, {
+ name = null,
+ roles = [],
+ handler
+}).
+
+-record(db, {
+ main_pid = nil,
+ update_pid = nil,
+ compactor_pid = nil,
+ instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+ fd,
+ updater_fd,
+ fd_ref_counter,
+ header = nil,
+ committed_update_seq,
+ fulldocinfo_by_id_btree,
+ docinfo_by_seq_btree,
+ local_docs_btree,
+ update_seq,
+ name,
+ filepath,
+ validate_doc_funs = [],
+ security = [],
+ security_ptr = nil,
+ user_ctx = #user_ctx{},
+ waiting_delayed_commit = nil,
+ revs_limit = 1000,
+ fsync_options = [],
+ options = [],
+ compression
+}).
+
+-record(rep, {
+ id,
+ source,
+ target,
+ options,
+ user_ctx,
+ doc_id
+}).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+
+main(_) ->
+ test_util:init_code_path(),
+
+ etap:plan(376),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+
+test() ->
+ couch_server_sup:start_link(test_util:config_files()),
+ ibrowse:start(),
+
+ Pairs = [
+ {source_db_name(), target_db_name()},
+ {{remote, source_db_name()}, target_db_name()},
+ {source_db_name(), {remote, target_db_name()}},
+ {{remote, source_db_name()}, {remote, (target_db_name())}}
+ ],
+
+ lists:foreach(
+ fun({Source, Target}) ->
+ {ok, SourceDb} = create_db(source_db_name()),
+ etap:is(couch_db:is_idle(SourceDb), true,
+ "Source database is idle before starting replication"),
+
+ {ok, TargetDb} = create_db(target_db_name()),
+ etap:is(couch_db:is_idle(TargetDb), true,
+ "Target database is idle before starting replication"),
+
+ {ok, RepPid, RepId} = replicate(Source, Target),
+ check_active_tasks(RepPid, RepId, Source, Target),
+ {ok, DocsWritten} = populate_and_compact_test(
+ RepPid, SourceDb, TargetDb),
+
+ wait_target_in_sync(DocsWritten, TargetDb),
+ check_active_tasks(RepPid, RepId, Source, Target),
+ cancel_replication(RepId, RepPid),
+ compare_dbs(SourceDb, TargetDb),
+
+ delete_db(SourceDb),
+ delete_db(TargetDb),
+ couch_server_sup:stop(),
+ ok = timer:sleep(1000),
+ couch_server_sup:start_link(test_util:config_files())
+ end,
+ Pairs),
+
+ couch_server_sup:stop(),
+ ok.
+
+
+populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
+ etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
+ check_db_alive("source", SourceDb0),
+ check_db_alive("target", TargetDb0),
+
+ Writer = spawn_writer(SourceDb0),
+
+ lists:foldl(
+ fun(_, {SourceDb, TargetDb, DocCount}) ->
+ pause_writer(Writer),
+
+ compact_db("source", SourceDb),
+ etap:is(is_process_alive(RepPid), true,
+ "Replication process is alive after source database compaction"),
+ check_db_alive("source", SourceDb),
+ check_ref_counter("source", SourceDb),
+
+ compact_db("target", TargetDb),
+ etap:is(is_process_alive(RepPid), true,
+ "Replication process is alive after target database compaction"),
+ check_db_alive("target", TargetDb),
+ check_ref_counter("target", TargetDb),
+
+ {ok, SourceDb2} = reopen_db(SourceDb),
+ {ok, TargetDb2} = reopen_db(TargetDb),
+
+ resume_writer(Writer),
+ wait_writer(Writer, DocCount),
+
+ compact_db("source", SourceDb2),
+ etap:is(is_process_alive(RepPid), true,
+ "Replication process is alive after source database compaction"),
+ check_db_alive("source", SourceDb2),
+ pause_writer(Writer),
+ check_ref_counter("source", SourceDb2),
+ resume_writer(Writer),
+
+ compact_db("target", TargetDb2),
+ etap:is(is_process_alive(RepPid), true,
+ "Replication process is alive after target database compaction"),
+ check_db_alive("target", TargetDb2),
+ pause_writer(Writer),
+ check_ref_counter("target", TargetDb2),
+ resume_writer(Writer),
+
+ {ok, SourceDb3} = reopen_db(SourceDb2),
+ {ok, TargetDb3} = reopen_db(TargetDb2),
+ {SourceDb3, TargetDb3, DocCount + 50}
+ end,
+ {SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
+
+ DocsWritten = stop_writer(Writer),
+ {ok, DocsWritten}.
+
+
+check_db_alive(Type, #db{main_pid = Pid}) ->
+ etap:is(is_process_alive(Pid), true,
+ "Local " ++ Type ++ " database main pid is alive").
+
+
+compact_db(Type, #db{name = Name}) ->
+ {ok, Db} = couch_db:open_int(Name, []),
+ {ok, CompactPid} = couch_db:start_compact(Db),
+ MonRef = erlang:monitor(process, CompactPid),
+ receive
+ {'DOWN', MonRef, process, CompactPid, normal} ->
+ ok;
+ {'DOWN', MonRef, process, CompactPid, Reason} ->
+ etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
+ ": " ++ couch_util:to_list(Reason))
+ after 30000 ->
+ etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
+ " didn't finish")
+ end,
+ ok = couch_db:close(Db).
+
+
+check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
+ MonRef = erlang:monitor(process, OldRefCounter),
+ receive
+ {'DOWN', MonRef, process, OldRefCounter, _} ->
+ etap:diag("Old " ++ Type ++ " database ref counter terminated")
+ after 30000 ->
+ etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
+ end,
+ {ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
+ ok = couch_db:close(Db),
+ etap:isnt(
+ NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
+
+
+reopen_db(#db{name = Name}) ->
+ {ok, Db} = couch_db:open_int(Name, []),
+ ok = couch_db:close(Db),
+ {ok, Db}.
+
+
+wait_target_in_sync(DocCount, #db{name = TargetName}) ->
+ wait_target_in_sync_loop(DocCount, TargetName, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+ etap:bail("Could not get source and target databases in sync");
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+ {ok, Target} = couch_db:open_int(TargetName, []),
+ {ok, TargetInfo} = couch_db:get_db_info(Target),
+ ok = couch_db:close(Target),
+ TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+ case TargetDocCount == DocCount of
+ true ->
+ etap:diag("Source and target databases are in sync");
+ false ->
+ ok = timer:sleep(100),
+ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+ end.
+
+
+compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
+ {ok, SourceDb} = couch_db:open_int(SourceName, []),
+ {ok, TargetDb} = couch_db:open_int(TargetName, []),
+ Fun = fun(FullDocInfo, _, Acc) ->
+ {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
+ {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
+ DocId = couch_util:get_value(<<"_id">>, Props),
+ DocTarget = case couch_db:open_doc(TargetDb, DocId) of
+ {ok, DocT} ->
+ DocT;
+ Error ->
+ etap:bail("Error opening document '" ++ ?b2l(DocId) ++
+ "' from target: " ++ couch_util:to_list(Error))
+ end,
+ DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
+ case DocTargetJson of
+ DocJson ->
+ ok;
+ _ ->
+ etap:bail("Content from document '" ++ ?b2l(DocId) ++
+ "' differs in target database")
+ end,
+ {ok, Acc}
+ end,
+ {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ etap:diag("Target database has the same documents as the source database"),
+ ok = couch_db:close(SourceDb),
+ ok = couch_db:close(TargetDb).
+
+
+check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
+ Source = case Src of
+ {remote, NameSrc} ->
+ <<(db_url(NameSrc))/binary, $/>>;
+ _ ->
+ Src
+ end,
+ Target = case Tgt of
+ {remote, NameTgt} ->
+ <<(db_url(NameTgt))/binary, $/>>;
+ _ ->
+ Tgt
+ end,
+ FullRepId = list_to_binary(BaseId ++ Ext),
+ Pid = list_to_binary(pid_to_list(RepPid)),
+ [RepTask] = couch_task_status:all(),
+ etap:is(couch_util:get_value(pid, RepTask), Pid,
+ "_active_tasks entry has correct pid property"),
+ etap:is(couch_util:get_value(replication_id, RepTask), FullRepId,
+ "_active_tasks entry has right replication id"),
+ etap:is(couch_util:get_value(continuous, RepTask), true,
+ "_active_tasks entry has continuous property set to true"),
+ etap:is(couch_util:get_value(source, RepTask), Source,
+ "_active_tasks entry has correct source property"),
+ etap:is(couch_util:get_value(target, RepTask), Target,
+ "_active_tasks entry has correct target property"),
+ etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true,
+ "_active_tasks entry has integer docs_read property"),
+ etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true,
+ "_active_tasks entry has integer docs_written property"),
+ etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true,
+ "_active_tasks entry has integer doc_write_failures property"),
+ etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true,
+ "_active_tasks entry has integer revisions_checked property"),
+ etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true,
+ "_active_tasks entry has integer missing_revisions_found property"),
+ etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true,
+ "_active_tasks entry has integer checkpointed_source_seq property"),
+ etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true,
+ "_active_tasks entry has integer source_seq property"),
+ Progress = couch_util:get_value(progress, RepTask),
+ etap:is(is_integer(Progress), true,
+ "_active_tasks entry has an integer progress property"),
+ etap:is(Progress =< 100, true, "Progress is not greater than 100%").
+
+
+wait_writer(Pid, NumDocs) ->
+ case get_writer_num_docs_written(Pid) of
+ N when N >= NumDocs ->
+ ok;
+ _ ->
+ wait_writer(Pid, NumDocs)
+ end.
+
+
+spawn_writer(Db) ->
+ Parent = self(),
+ Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
+ etap:diag("Started source database writer"),
+ Pid.
+
+
+pause_writer(Pid) ->
+ Ref = make_ref(),
+ Pid ! {pause, Ref},
+ receive
+ {paused, Ref} ->
+ ok
+ after 30000 ->
+ etap:bail("Failed to pause source database writer")
+ end.
+
+
+resume_writer(Pid) ->
+ Ref = make_ref(),
+ Pid ! {continue, Ref},
+ receive
+ {ok, Ref} ->
+ ok
+ after 30000 ->
+ etap:bail("Failed to unpause source database writer")
+ end.
+
+
+get_writer_num_docs_written(Pid) ->
+ Ref = make_ref(),
+ Pid ! {get_count, Ref},
+ receive
+ {count, Ref, Count} ->
+ Count
+ after 30000 ->
+ etap:bail("Timeout getting number of documents written from "
+ "source database writer")
+ end.
+
+
+stop_writer(Pid) ->
+ Ref = make_ref(),
+ Pid ! {stop, Ref},
+ receive
+ {stopped, Ref, DocsWritten} ->
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, _Reason} ->
+ etap:diag("Stopped source database writer"),
+ DocsWritten
+ after 30000 ->
+ etap:bail("Timeout stopping source database writer")
+ end
+ after 30000 ->
+ etap:bail("Timeout stopping source database writer")
+ end.
+
+
+writer_loop(#db{name = DbName}, Parent, Counter) ->
+ maybe_pause(Parent, Counter),
+ Doc = couch_doc:from_json_obj({[
+ {<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
+ {<<"value">>, Counter + 1},
+ {<<"_attachments">>, {[
+ {<<"icon1.png">>, {[
+ {<<"data">>, base64:encode(att_data())},
+ {<<"content_type">>, <<"image/png">>}
+ ]}},
+ {<<"icon2.png">>, {[
+ {<<"data">>, base64:encode(iolist_to_binary(
+ [att_data(), att_data()]))},
+ {<<"content_type">>, <<"image/png">>}
+ ]}}
+ ]}}
+ ]}),
+ maybe_pause(Parent, Counter),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ ok = couch_db:close(Db),
+ receive
+ {get_count, Ref} ->
+ Parent ! {count, Ref, Counter + 1},
+ writer_loop(Db, Parent, Counter + 1);
+ {stop, Ref} ->
+ Parent ! {stopped, Ref, Counter + 1}
+ after 0 ->
+ ok = timer:sleep(500),
+ writer_loop(Db, Parent, Counter + 1)
+ end.
+
+
+maybe_pause(Parent, Counter) ->
+ receive
+ {get_count, Ref} ->
+ Parent ! {count, Ref, Counter};
+ {pause, Ref} ->
+ Parent ! {paused, Ref},
+ receive {continue, Ref2} -> Parent ! {ok, Ref2} end
+ after 0 ->
+ ok
+ end.
+
+
+db_url(DbName) ->
+ iolist_to_binary([
+ "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+ ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+ "/", DbName
+ ]).
+
+
+create_db(DbName) ->
+ {ok, Db} = couch_db:create(
+ DbName,
+ [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
+ couch_db:close(Db),
+ {ok, Db}.
+
+
+delete_db(#db{name = DbName, main_pid = Pid}) ->
+ ok = couch_server:delete(
+ DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, _Reason} ->
+ ok
+ after 30000 ->
+ etap:bail("Timeout deleting database")
+ end.
+
+
+replicate({remote, Db}, Target) ->
+ replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+ replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+ RepObject = {[
+ {<<"source">>, Source},
+ {<<"target">>, Target},
+ {<<"continuous">>, true}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+ RepObject, #user_ctx{roles = [<<"_admin">>]}),
+ {ok, Pid} = couch_replicator:async_replicate(Rep),
+ {ok, Pid, Rep#rep.id}.
+
+
+cancel_replication(RepId, RepPid) ->
+ {ok, _} = couch_replicator:cancel_replication(RepId),
+ etap:is(is_process_alive(RepPid), false,
+ "Replication process is no longer alive after cancel").
+
+
+att_data() ->
+ {ok, Data} = file:read_file(
+ test_util:source_file("share/www/image/logo.png")),
+ Data.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/04-replication-large-atts.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/04-replication-large-atts.t b/src/couch_replicator/test/04-replication-large-atts.t
new file mode 100755
index 0000000..5386179
--- /dev/null
+++ b/src/couch_replicator/test/04-replication-large-atts.t
@@ -0,0 +1,267 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test replication of large attachments. Verify that both source and
+% target have the same attachment data and metadata.
+
+-define(b2l(Bin), binary_to_list(Bin)).
+
+-record(user_ctx, {
+ name = null,
+ roles = [],
+ handler
+}).
+
+-record(doc, {
+ id = <<"">>,
+ revs = {0, []},
+ body = {[]},
+ atts = [],
+ deleted = false,
+ meta = []
+}).
+
+-record(att, {
+ name,
+ type,
+ att_len,
+ disk_len,
+ md5= <<>>,
+ revpos=0,
+ data,
+ encoding=identity
+}).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+
+main(_) ->
+ test_util:init_code_path(),
+
+ etap:plan(1192),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+
+test() ->
+ couch_server_sup:start_link(test_util:config_files()),
+ ibrowse:start(),
+ crypto:start(),
+ couch_config:set("attachments", "compressible_types", "text/*", false),
+
+ Pairs = [
+ {source_db_name(), target_db_name()},
+ {{remote, source_db_name()}, target_db_name()},
+ {source_db_name(), {remote, target_db_name()}},
+ {{remote, source_db_name()}, {remote, (target_db_name())}}
+ ],
+
+ {ok, SourceDb} = create_db(source_db_name()),
+ etap:diag("Populating source database"),
+ populate_db(SourceDb, 11),
+ ok = couch_db:close(SourceDb),
+
+ lists:foreach(
+ fun({Source, Target}) ->
+ etap:diag("Creating target database"),
+ {ok, TargetDb} = create_db(target_db_name()),
+
+ ok = couch_db:close(TargetDb),
+ etap:diag("Triggering replication"),
+ replicate(Source, Target),
+ etap:diag("Replication finished, comparing source and target databases"),
+ compare_dbs(SourceDb, TargetDb),
+
+ etap:diag("Deleting target database"),
+ delete_db(TargetDb),
+ ok = timer:sleep(1000)
+ end,
+ Pairs),
+
+ delete_db(SourceDb),
+ couch_server_sup:stop(),
+ ok.
+
+
+populate_db(Db, DocCount) ->
+ Docs = lists:foldl(
+ fun(DocIdCounter, Acc) ->
+ Doc = #doc{
+ id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
+ body = {[]},
+ atts = [
+ att(<<"att1">>, 2 * 1024 * 1024, <<"text/plain">>),
+ att(<<"att2">>, round(6.6 * 1024 * 1024), <<"app/binary">>)
+ ]
+ },
+ [Doc | Acc]
+ end,
+ [], lists:seq(1, DocCount)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []).
+
+
+att(Name, Size, Type) ->
+ #att{
+ name = Name,
+ type = Type,
+ att_len = Size,
+ data = fun(Count) -> crypto:rand_bytes(Count) end
+ }.
+
+
+compare_dbs(Source, Target) ->
+ {ok, SourceDb} = couch_db:open_int(couch_db:name(Source), []),
+ {ok, TargetDb} = couch_db:open_int(couch_db:name(Target), []),
+
+ Fun = fun(FullDocInfo, _, Acc) ->
+ {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
+ Id = DocSource#doc.id,
+
+ etap:diag("Verifying document " ++ ?b2l(Id)),
+
+ {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
+ etap:is(DocTarget#doc.body, DocSource#doc.body,
+ "Same body in source and target databases"),
+
+ #doc{atts = SourceAtts} = DocSource,
+ #doc{atts = TargetAtts} = DocTarget,
+ etap:is(
+ lists:sort([N || #att{name = N} <- SourceAtts]),
+ lists:sort([N || #att{name = N} <- TargetAtts]),
+ "Document has same number (and names) of attachments in "
+ "source and target databases"),
+
+ lists:foreach(
+ fun(#att{name = AttName} = Att) ->
+ etap:diag("Verifying attachment " ++ ?b2l(AttName)),
+
+ {ok, AttTarget} = find_att(TargetAtts, AttName),
+ SourceMd5 = att_md5(Att),
+ TargetMd5 = att_md5(AttTarget),
+ case AttName of
+ <<"att1">> ->
+ etap:is(Att#att.encoding, gzip,
+ "Attachment is gzip encoded in source database"),
+ etap:is(AttTarget#att.encoding, gzip,
+ "Attachment is gzip encoded in target database"),
+ DecSourceMd5 = att_decoded_md5(Att),
+ DecTargetMd5 = att_decoded_md5(AttTarget),
+ etap:is(DecTargetMd5, DecSourceMd5,
+ "Same identity content in source and target databases");
+ _ ->
+ etap:is(Att#att.encoding, identity,
+ "Attachment is not encoded in source database"),
+ etap:is(AttTarget#att.encoding, identity,
+ "Attachment is not encoded in target database")
+ end,
+ etap:is(TargetMd5, SourceMd5,
+ "Same content in source and target databases"),
+ etap:is(is_integer(Att#att.disk_len), true,
+ "#att.disk_len is an integer in source database"),
+ etap:is(is_integer(Att#att.att_len), true,
+ "#att.att_len is an integer in source database"),
+ etap:is(is_integer(AttTarget#att.disk_len), true,
+ "#att.disk_len is an integer in target database"),
+ etap:is(is_integer(AttTarget#att.att_len), true,
+ "#att.att_len is an integer in target database"),
+ etap:is(Att#att.disk_len, AttTarget#att.disk_len,
+ "Same identity length in source and target databases"),
+ etap:is(Att#att.att_len, AttTarget#att.att_len,
+ "Same encoded length in source and target databases"),
+ etap:is(Att#att.type, AttTarget#att.type,
+ "Same type in source and target databases"),
+ etap:is(Att#att.md5, SourceMd5, "Correct MD5 in source database"),
+ etap:is(AttTarget#att.md5, SourceMd5, "Correct MD5 in target database")
+ end,
+ SourceAtts),
+
+ {ok, Acc}
+ end,
+
+ {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ ok = couch_db:close(SourceDb),
+ ok = couch_db:close(TargetDb).
+
+
+find_att([], _Name) ->
+ nil;
+find_att([#att{name = Name} = Att | _], Name) ->
+ {ok, Att};
+find_att([_ | Rest], Name) ->
+ find_att(Rest, Name).
+
+
+att_md5(Att) ->
+ Md50 = couch_doc:att_foldl(
+ Att,
+ fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
+ couch_util:md5_init()),
+ couch_util:md5_final(Md50).
+
+att_decoded_md5(Att) ->
+ Md50 = couch_doc:att_foldl_decode(
+ Att,
+ fun(Chunk, Acc) -> couch_util:md5_update(Acc, Chunk) end,
+ couch_util:md5_init()),
+ couch_util:md5_final(Md50).
+
+
+db_url(DbName) ->
+ iolist_to_binary([
+ "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+ ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+ "/", DbName
+ ]).
+
+
+create_db(DbName) ->
+ couch_db:create(
+ DbName,
+ [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
+
+
+delete_db(Db) ->
+ ok = couch_server:delete(
+ couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
+
+
+replicate({remote, Db}, Target) ->
+ replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+ replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+ RepObject = {[
+ {<<"source">>, Source},
+ {<<"target">>, Target}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+ RepObject, #user_ctx{roles = [<<"_admin">>]}),
+ {ok, Pid} = couch_replicator:async_replicate(Rep),
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, Reason} ->
+ etap:is(Reason, normal, "Replication finished successfully")
+ after 300000 ->
+ etap:bail("Timeout waiting for replication to finish")
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/604f1a60/src/couch_replicator/test/05-replication-many-leaves.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/05-replication-many-leaves.t b/src/couch_replicator/test/05-replication-many-leaves.t
new file mode 100755
index 0000000..52d2023
--- /dev/null
+++ b/src/couch_replicator/test/05-replication-many-leaves.t
@@ -0,0 +1,216 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Test replication of documents with many leaf revisions.
+% Motivated by COUCHDB-1340 and other similar issues where a document
+% GET with a too long ?open_revs revision list doesn't work due to
+% maximum web server limits for the HTTP request path.
+
+-record(user_ctx, {
+ name = null,
+ roles = [],
+ handler
+}).
+
+-record(doc, {
+ id = <<"">>,
+ revs = {0, []},
+ body = {[]},
+ atts = [],
+ deleted = false,
+ meta = []
+}).
+
+-define(b2l(B), binary_to_list(B)).
+-define(l2b(L), list_to_binary(L)).
+-define(i2l(I), integer_to_list(I)).
+
+
+source_db_name() -> <<"couch_test_rep_db_a">>.
+target_db_name() -> <<"couch_test_rep_db_b">>.
+
+doc_ids() ->
+ [<<"doc1">>, <<"doc2">>, <<"doc3">>].
+
+doc_num_conflicts(<<"doc1">>) -> 100;
+doc_num_conflicts(<<"doc2">>) -> 200;
+doc_num_conflicts(<<"doc3">>) -> 550.
+
+
+main(_) ->
+ test_util:init_code_path(),
+
+ etap:plan(16),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+
+test() ->
+ couch_server_sup:start_link(test_util:config_files()),
+ ibrowse:start(),
+ crypto:start(),
+
+ Pairs = [
+ {source_db_name(), target_db_name()},
+ {{remote, source_db_name()}, target_db_name()},
+ {source_db_name(), {remote, target_db_name()}},
+ {{remote, source_db_name()}, {remote, (target_db_name())}}
+ ],
+
+ {ok, SourceDb} = create_db(source_db_name()),
+ etap:diag("Populating source database"),
+ {ok, DocRevs} = populate_db(SourceDb),
+ ok = couch_db:close(SourceDb),
+
+ lists:foreach(
+ fun({Source, Target}) ->
+ etap:diag("Creating target database"),
+ {ok, TargetDb} = create_db(target_db_name()),
+
+ ok = couch_db:close(TargetDb),
+ etap:diag("Triggering replication"),
+ replicate(Source, Target),
+ etap:diag("Replication finished, comparing source and target databases"),
+ {ok, TargetDb2} = couch_db:open_int(target_db_name(), []),
+ verify_target(TargetDb2, DocRevs),
+ ok = couch_db:close(TargetDb2),
+
+ etap:diag("Deleting target database"),
+ delete_db(TargetDb),
+ ok = timer:sleep(1000)
+ end,
+ Pairs),
+
+ delete_db(SourceDb),
+ couch_server_sup:stop(),
+ ok.
+
+
+populate_db(Db) ->
+ DocRevsDict = lists:foldl(
+ fun(DocId, Acc) ->
+ Value = <<"0">>,
+ Doc = #doc{
+ id = DocId,
+ body = {[ {<<"value">>, Value} ]}
+ },
+ {ok, Rev} = couch_db:update_doc(Db, Doc, []),
+ {ok, RevsDict} = add_doc_siblings(Db, DocId, doc_num_conflicts(DocId)),
+ RevsDict2 = dict:store(Rev, Value, RevsDict),
+ dict:store(DocId, RevsDict2, Acc)
+ end,
+ dict:new(), doc_ids()),
+ {ok, dict:to_list(DocRevsDict)}.
+
+
+add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 ->
+ add_doc_siblings(Db, DocId, NumLeaves, [], dict:new()).
+
+
+add_doc_siblings(Db, _DocId, 0, AccDocs, RevsDict) ->
+ {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes),
+ {ok, RevsDict};
+
+add_doc_siblings(Db, DocId, NumLeaves, AccDocs, RevsDict) ->
+ Value = list_to_binary(integer_to_list(NumLeaves)),
+ Rev = couch_util:md5(Value),
+ RevsDict2 = dict:store({1, Rev}, Value, RevsDict),
+ Doc = #doc{
+ id = DocId,
+ revs = {1, [Rev]},
+ body = {[ {<<"value">>, Value} ]}
+ },
+ add_doc_siblings(Db, DocId, NumLeaves - 1, [Doc | AccDocs], RevsDict2).
+
+
+verify_target(_TargetDb, []) ->
+ ok;
+
+verify_target(TargetDb, [{DocId, RevsDict} | Rest]) ->
+ {ok, Lookups} = couch_db:open_doc_revs(
+ TargetDb,
+ DocId,
+ [R || {R, _} <- dict:to_list(RevsDict)],
+ [ejson_body]),
+ Docs = [Doc || {ok, Doc} <- Lookups],
+ Total = doc_num_conflicts(DocId) + 1,
+ etap:is(
+ length(Docs),
+ Total,
+ "Target has " ++ ?i2l(Total) ++ " leaf revisions of document " ++ ?b2l(DocId)),
+ etap:diag("Verifying all revisions of document " ++ ?b2l(DocId)),
+ lists:foreach(
+ fun(#doc{revs = {Pos, [RevId]}, body = {Body}}) ->
+ Rev = {Pos, RevId},
+ {ok, Value} = dict:find(Rev, RevsDict),
+ case couch_util:get_value(<<"value">>, Body) of
+ Value ->
+ ok;
+ Other ->
+ etap:bail("Wrong value for revision " ++
+ ?b2l(couch_doc:rev_to_str(Rev)) ++ " of document " ++
+ ?b2l(DocId) ++ ". Expected `" ++ couch_util:to_list(Value) ++
+ "`, got `" ++ couch_util:to_list(Other) ++ "`")
+ end
+ end,
+ Docs),
+ verify_target(TargetDb, Rest).
+
+
+db_url(DbName) ->
+ iolist_to_binary([
+ "http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
+ ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
+ "/", DbName
+ ]).
+
+
+create_db(DbName) ->
+ couch_db:create(
+ DbName,
+ [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
+
+
+delete_db(Db) ->
+ ok = couch_server:delete(
+ couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
+
+
+replicate({remote, Db}, Target) ->
+ replicate(db_url(Db), Target);
+
+replicate(Source, {remote, Db}) ->
+ replicate(Source, db_url(Db));
+
+replicate(Source, Target) ->
+ RepObject = {[
+ {<<"source">>, Source},
+ {<<"target">>, Target}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(
+ RepObject, #user_ctx{roles = [<<"_admin">>]}),
+ {ok, Pid} = couch_replicator:async_replicate(Rep),
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, Reason} ->
+ etap:is(Reason, normal, "Replication finished successfully")
+ after 300000 ->
+ etap:bail("Timeout waiting for replication to finish")
+ end.