You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/21 18:16:27 UTC
[couchdb] branch shard-split updated: Fix db deletion race in
reshard API tests
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/shard-split by this push:
new 4d3907f Fix db deletion race in reshard API tests
4d3907f is described below
commit 4d3907f4409281b0f200eecf6eb3d50bf2303507
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu Feb 21 13:16:15 2019 -0500
Fix db deletion race in reshard API tests
---
src/mem3/test/mem3_reshard_api_test.erl | 205 ++++++++++++++++----------------
1 file changed, 105 insertions(+), 100 deletions(-)
diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl
index 70640f3..ccf75cc 100644
--- a/src/mem3/test/mem3_reshard_api_test.erl
+++ b/src/mem3/test/mem3_reshard_api_test.erl
@@ -22,9 +22,6 @@
-define(PASS, "pass").
-define(AUTH, {basic_auth, {?USER, ?PASS}}).
-define(JSON, {"Content-Type", "application/json"}).
--define(DB1, "mem3_reshard_api_test_db1").
--define(DB2, "mem3_reshard_api_test_db2").
--define(DB3, "mem3_reshard_api_test_db3").
-define(RESHARD, "_reshard/").
-define(JOBS, "_reshard/jobs/").
-define(STATE, "_reshard/state").
@@ -38,17 +35,18 @@ setup() ->
Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
Port = mochiweb_socket_server:get(chttpd, port),
Url = lists:concat(["http://", Addr, ":", Port, "/"]),
- create_db(Url, ?DB1, "?q=1&n=1"),
- create_db(Url, ?DB2, "?q=1&n=1"),
- create_db(Url, ?DB3, "?q=2&n=1"),
- Url.
+ {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()},
+ create_db(Url, Db1, "?q=1&n=1"),
+ create_db(Url, Db2, "?q=1&n=1"),
+ create_db(Url, Db3, "?q=2&n=1"),
+ {Url, {Db1, Db2, Db3}}.
-teardown(Url) ->
+teardown({Url, {Db1, Db2, Db3}}) ->
mem3_reshard:reset_state(),
- delete_db(Url, ?DB1),
- delete_db(Url, ?DB2),
- delete_db(Url, ?DB3),
+ delete_db(Url, Db1),
+ delete_db(Url, Db2),
+ delete_db(Url, Db3),
ok = config:delete("mem3_reshard", "max_jobs", _Persist=false),
ok = config:delete("admins", ?USER, _Persist=false),
meck:unload().
@@ -109,7 +107,7 @@ mem3_reshard_api_test_() ->
}.
-basics(Top) ->
+basics({Top, _}) ->
?_test(begin
% GET /_reshard
?assertMatch({200, #{
@@ -140,10 +138,10 @@ basics(Top) ->
end).
-create_job_basic(Top) ->
+create_job_basic({Top, {Db1, _, _}}) ->
?_test(begin
% POST /_reshard/jobs
- {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>}),
+ {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
?assertEqual(201, C1),
?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}]
when is_binary(J) andalso is_binary(S), R1),
@@ -195,14 +193,14 @@ create_job_basic(Top) ->
end).
-create_two_jobs(Top) ->
+create_two_jobs({Top, {Db1, Db2, _}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
?assertMatch({201, [#{?OK := true}]},
- req(post, Jobs, #{type => split, db => <<?DB1>>})),
+ req(post, Jobs, #{type => split, db => Db1})),
?assertMatch({201, [#{?OK := true}]},
- req(post, Jobs, #{type => split, db => <<?DB2>>})),
+ req(post, Jobs, #{type => split, db => Db2})),
?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
@@ -221,16 +219,16 @@ create_two_jobs(Top) ->
end).
-create_multiple_jobs_from_one_post(Top) ->
+create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
- {C1, R1} = req(post, Jobs, #{type => split, db => <<?DB3>>}),
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db3}),
?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}),
?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD))
end).
-start_stop_cluster_basic(Top) ->
+start_stop_cluster_basic({Top, _}) ->
?_test(begin
Url = Top ++ ?STATE,
@@ -264,7 +262,7 @@ start_stop_cluster_basic(Top) ->
end).
-start_stop_cluster_with_a_job(Top) ->
+start_stop_cluster_with_a_job({Top, {Db1, _, _}}) ->
?_test(begin
Url = Top ++ ?STATE,
@@ -272,7 +270,7 @@ start_stop_cluster_with_a_job(Top) ->
?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)),
% Can add jobs with global state stopped, they just won't be running
- {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>}),
+ {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
?assertMatch([#{?OK := true}], R1),
[#{?ID := Id1}] = R1,
{200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
@@ -296,7 +294,7 @@ start_stop_cluster_with_a_job(Top) ->
% Add same job again
{201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
- db => <<?DB1>>}),
+ db => Db1}),
?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
@@ -307,11 +305,11 @@ start_stop_cluster_with_a_job(Top) ->
end).
-individual_job_start_stop(Top) ->
+individual_job_start_stop({Top, {Db1, _, _}}) ->
?_test(begin
intercept_state(topoff1),
- Body = #{type => split, db => <<?DB1>>},
+ Body = #{type => split, db => Db1},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
@@ -343,11 +341,11 @@ individual_job_start_stop(Top) ->
end).
-individual_job_stop_when_cluster_stopped(Top) ->
+individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) ->
?_test(begin
intercept_state(topoff1),
- Body = #{type => split, db => <<?DB1>>},
+ Body = #{type => split, db => Db1},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
@@ -387,7 +385,7 @@ individual_job_stop_when_cluster_stopped(Top) ->
end).
-create_job_with_invalid_arguments(Top) ->
+create_job_with_invalid_arguments({Top, {Db1, _, _}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
@@ -395,7 +393,7 @@ create_job_with_invalid_arguments(Top) ->
?assertMatch({400, _}, req(post, Jobs, #{})),
% Missing type
- ?assertMatch({400, _}, req(post, Jobs, #{db => <<?DB1>>})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})),
% Have type but no db and no shard
?assertMatch({400, _}, req(post, Jobs, #{type => split})),
@@ -409,40 +407,40 @@ create_job_with_invalid_arguments(Top) ->
shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})),
% Bad range values, too large, different types, inverted
- ?assertMatch({400, _}, req(post, Jobs, #{db => <<?DB1>>, range => 42,
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42,
type => split})),
- ?assertMatch({400, _}, req(post, Jobs, #{db => <<?DB1>>,
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
range => <<"x">>, type => split})),
- ?assertMatch({400, _}, req(post, Jobs, #{db => <<?DB1>>,
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
range => <<"ffffffff-80000000">>, type => split})),
- ?assertMatch({400, _}, req(post, Jobs, #{db => <<?DB1>>,
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
range => <<"00000000-fffffffff">>, type => split})),
% Can't have both db and shard
- ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => <<?DB1>>,
+ ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1,
shard => <<"blah">>}))
end).
-create_job_with_db(Top) ->
+create_job_with_db({Top, {Db1, _, _}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
- Db1 = #{type => split, db => <<?DB1>>},
+ Body1 = #{type => split, db => Db1},
% Node with db
N = atom_to_binary(node(), utf8),
- {C1, R1} = req(post, Jobs, Db1#{node => N}),
+ {C1, R1} = req(post, Jobs, Body1#{node => N}),
?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
wait_to_complete_then_cleanup(Top, R1),
% Range and db
- {C2, R2} = req(post, Jobs, Db1#{range => <<"00000000-7fffffff">>}),
+ {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}),
?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
wait_to_complete_then_cleanup(Top, R2),
% Node, range and db
Range = <<"80000000-ffffffff">>,
- {C3, R3} = req(post, Jobs, Db1#{range => Range, node => N}),
+ {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}),
?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
wait_to_complete_then_cleanup(Top, R3),
@@ -451,14 +449,14 @@ create_job_with_db(Top) ->
[16#40000000, 16#7fffffff],
[16#80000000, 16#bfffffff],
[16#c0000000, 16#ffffffff]
- ], [mem3:range(S) || S <- lists:sort(mem3:shards(<<?DB1>>))])
+ ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))])
end).
-create_job_with_shard_name(Top) ->
+create_job_with_shard_name({Top, {_, _, Db3}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
- [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(<<?DB3>>))],
+ [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))],
% Shard only
{C1, R1} = req(post, Jobs, #{type => split, shard => S1}),
@@ -476,16 +474,16 @@ create_job_with_shard_name(Top) ->
[16#40000000, 16#7fffffff],
[16#80000000, 16#bfffffff],
[16#c0000000, 16#ffffffff]
- ], [mem3:range(S) || S <- lists:sort(mem3:shards(<<?DB3>>))])
+ ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))])
end).
-completed_job_handling(Top) ->
+completed_job_handling({Top, {Db1, _, _}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
% Run job to completion
- {C1, R1} = req(post, Jobs, #{type => split, db => <<?DB1>>}),
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
[#{?ID := Id}] = R1,
wait_to_complete(Top, R1),
@@ -532,141 +530,141 @@ completed_job_handling(Top) ->
end).
-handle_db_deletion_in_topoff1(Top) ->
+handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, topoff1),
+ JobId = delete_source_in_state(Top, Db1, topoff1),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_initial_copy(Top) ->
+handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, initial_copy),
+ JobId = delete_source_in_state(Top, Db1, initial_copy),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_copy_local_docs(Top) ->
+handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, copy_local_docs),
+ JobId = delete_source_in_state(Top, Db1, copy_local_docs),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_build_indices(Top) ->
+handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, build_indices),
+ JobId = delete_source_in_state(Top, Db1, build_indices),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_update_shard_map(Top) ->
+handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, update_shardmap),
+ JobId = delete_source_in_state(Top, Db1, update_shardmap),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_wait_source_close(Top) ->
+handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, wait_source_close),
+ JobId = delete_source_in_state(Top, Db1, wait_source_close),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_topoff3(Top) ->
+handle_db_deletion_in_topoff3({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, topoff3),
+ JobId = delete_source_in_state(Top, Db1, topoff3),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-handle_db_deletion_in_source_delete(Top) ->
+handle_db_deletion_in_source_delete({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = delete_source_in_state(Top, ?DB1, source_delete),
+ JobId = delete_source_in_state(Top, Db1, source_delete),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end).
-recover_in_topoff1(Top) ->
+recover_in_topoff1({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, topoff1),
+ JobId = recover_in_state(Top, Db1, topoff1),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-recover_in_initial_copy(Top) ->
+recover_in_initial_copy({Top, {Db1, _, _}}) ->
{timeout, 60, ?_test(begin
- JobId = recover_in_state(Top, ?DB1, initial_copy),
+ JobId = recover_in_state(Top, Db1, initial_copy),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
-recover_in_copy_local_docs(Top) ->
+recover_in_copy_local_docs({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, copy_local_docs),
+ JobId = recover_in_state(Top, Db1, copy_local_docs),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-recover_in_build_indices(Top) ->
+recover_in_build_indices({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, build_indices),
+ JobId = recover_in_state(Top, Db1, build_indices),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-recover_in_update_shard_map(Top) ->
+recover_in_update_shard_map({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, update_shardmap),
+ JobId = recover_in_state(Top, Db1, update_shardmap),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-recover_in_wait_source_close(Top) ->
+recover_in_wait_source_close({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, wait_source_close),
+ JobId = recover_in_state(Top, Db1, wait_source_close),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-recover_in_topoff3(Top) ->
+recover_in_topoff3({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, topoff3),
+ JobId = recover_in_state(Top, Db1, topoff3),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-recover_in_source_delete(Top) ->
+recover_in_source_delete({Top, {Db1, _, _}}) ->
?_test(begin
- JobId = recover_in_state(Top, ?DB1, source_delete),
+ JobId = recover_in_state(Top, Db1, source_delete),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end).
-check_max_jobs(Top) ->
+check_max_jobs({Top, {Db1, Db2, _}}) ->
?_test(begin
Jobs = Top ++ ?JOBS,
config:set("mem3_reshard", "max_jobs", "0", _Persist=false),
- {C1, R1} = req(post, Jobs, #{type => split, db => <<?DB1>>}),
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}),
config:set("mem3_reshard", "max_jobs", "1", _Persist=false),
- {201, R2} = req(post, Jobs, #{type => split, db => <<?DB1>>}),
+ {201, R2} = req(post, Jobs, #{type => split, db => Db1}),
wait_to_complete(Top, R2),
% Stop clustering so jobs are not started anymore and ensure max jobs is
% is enforced even if jobs are stopped
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
- {C3, R3} = req(post, Jobs, #{type => split, db => <<?DB2>>}),
+ {C3, R3} = req(post, Jobs, #{type => split, db => Db2}),
?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C3, R3}),
% Allow the job to be created by raising max_jobs
config:set("mem3_reshard", "max_jobs", "2", _Persist=false),
- {C4, R4} = req(post, Jobs, #{type => split, db => <<?DB2>>}),
+ {C4, R4} = req(post, Jobs, #{type => split, db => Db2}),
?assertEqual(201, C4),
% Lower max_jobs after job is created but it's not running
@@ -681,13 +679,13 @@ check_max_jobs(Top) ->
end).
-cleanup_completed_jobs(Top) ->
+cleanup_completed_jobs({Top, {Db1, _, _}}) ->
?_test(begin
- Body = #{type => split, db => <<?DB1>>},
+ Body = #{type => split, db => Db1},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
wait_state(JobUrl ++ "/state", <<"completed">>),
- delete_db(Top, ?DB1),
+ delete_db(Top, Db1),
wait_for_http_code(JobUrl, 404)
end).
@@ -750,19 +748,19 @@ wait_for_http_code(Url, Code) when is_integer(Code) ->
end, 30000).
-delete_source_in_state(Top, Db, State) when is_atom(State) ->
+delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) ->
intercept_state(State),
- Body = #{type => split, db => list_to_binary(Db)},
+ Body = #{type => split, db => Db},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
receive {JobPid, State} -> ok end,
- delete_db(Top, Db),
+ sync_delete_db(Top, Db),
JobPid ! continue,
Id.
recover_in_state(Top, Db, State) when is_atom(State) ->
intercept_state(State),
- Body = #{type => split, db => list_to_binary(Db)},
+ Body = #{type => split, db => Db},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
receive {JobPid, State} -> ok end,
% Job is now stuck in running we prevented it from executing
@@ -775,25 +773,32 @@ recover_in_state(Top, Db, State) when is_atom(State) ->
Id.
-create_db(Top, Db, QArgs) ->
- Url = Top ++ Db ++ QArgs,
+create_db(Top, Db, QArgs) when is_binary(Db) ->
+ Url = Top ++ binary_to_list(Db) ++ QArgs,
{ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
?assert(Status =:= 201 orelse Status =:= 202).
-delete_db(Top, Db) ->
- Url = Top ++ Db,
+delete_db(Top, Db) when is_binary(Db) ->
+ Url = Top ++ binary_to_list(Db),
case test_request:get(Url, [?AUTH]) of
{ok, 404, _, _} ->
- ok;
+ not_found;
{ok, 200, _, _} ->
- Shards = mem3:local_shards(list_to_binary(Db)),
- ShardNames = [mem3:name(S) || S <- Shards],
{ok, 200, _, _} = test_request:delete(Url, [?AUTH]),
- % delete is asynchronous (db doc is deleted but shards are deleted in
- % spawned workers after the client gets a response so we resort to
- % directly cleaning up the shards as well
- [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames],
+ ok
+ end.
+
+
+sync_delete_db(Top, Db) when is_binary(Db) ->
+ delete_db(Top, Db),
+ try
+ Shards = mem3:local_shards(Db),
+ ShardNames = [mem3:name(S) || S <- Shards],
+ [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames],
+ ok
+ catch
+ error:database_does_not_exist ->
ok
end.