You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/04/18 15:54:43 UTC
[couchdb] branch COUCHDB-3376-fix-mem3-shards updated (a9b2ef2 ->
3746e3c)
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a change to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
from a9b2ef2 Use a temporary process when caching shard maps
new 6bd2d63 Add unit tests for mem3_shards race condition and writer process
new 3746e3c Correctly delete writer information from ?OPENERS in mem3_shards
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "adds" were already present in the repository and have only
been added to this reference.
Summary of changes:
src/mem3/src/mem3_shards.erl | 222 ++++++++++++++++++++++++++++++++++++++++---
1 file changed, 210 insertions(+), 12 deletions(-)
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].
[couchdb] 02/02: Correctly delete writer information from ?OPENERS
in mem3_shards
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3746e3c44a905f06f85d77930015a62f4a52e00c
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Apr 18 01:16:21 2017 -0400
Correctly delete writer information from ?OPENERS in mem3_shards
`?OPENERS` is an ETS table of type bag. To delete one specific object have to
use `ets:delete_object(Tab, Object)`
Without this fix writers were never cleaned up and no new writers could be
spawned.
COUCHDB-3376
---
src/mem3/src/mem3_shards.erl | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index af9417a..5cdbc6c 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -480,7 +480,7 @@ shard_writer(DbName, Shards, IdleTimeout) ->
ok
end
after
- true = ets:delete(?OPENERS, {DbName, self()})
+ true = ets:delete_object(?OPENERS, {DbName, self()})
end.
flush_write(DbName, Writer, WriteTimeout) ->
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 01/02: Add unit tests for mem3_shards race condition and
writer process
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6bd2d63f101c2a5f4bc0587e3809cdabd1a00693
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Apr 18 01:14:20 2017 -0400
Add unit tests for mem3_shards race condition and writer process
COUCHDB-3376
---
src/mem3/src/mem3_shards.erl | 220 ++++++++++++++++++++++++++++++++++++++++---
1 file changed, 209 insertions(+), 11 deletions(-)
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 7336668..af9417a 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -334,7 +334,7 @@ changes_callback({change, {Change}, _}, _) ->
[DbName, Reason]);
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
- Writer = spawn_shard_writer(DbName, Shards),
+ Writer = spawn_shard_writer(DbName, Shards, ?WRITE_IDLE_TIMEOUT),
ets:insert(?OPENERS, {DbName, Writer}),
Msg = {cache_insert_change, DbName, Writer, Seq},
gen_server:cast(?MODULE, Msg),
@@ -362,7 +362,7 @@ load_shards_from_db(#db{} = ShardDb, DbName) ->
{ok, #doc{body = {Props}}} ->
Seq = couch_db:get_update_seq(ShardDb),
Shards = mem3_util:build_ordered_shards(DbName, Props),
- case maybe_spawn_shard_writer(DbName, Shards) of
+ case maybe_spawn_shard_writer(DbName, Shards, ?WRITE_IDLE_TIMEOUT) of
Writer when is_pid(Writer) ->
case ets:insert_new(?OPENERS, {DbName, Writer}) of
true ->
@@ -407,7 +407,7 @@ create_if_missing(Name) ->
cache_insert(#st{cur_size=Cur}=St, DbName, Writer) ->
NewATime = now(),
true = ets:delete(?SHARDS, DbName),
- flush_write(DbName, Writer),
+ flush_write(DbName, Writer, ?WRITE_TIMEOUT),
case ets:lookup(?DBS, DbName) of
[{DbName, ATime}] ->
true = ets:delete(?ATIMES, ATime),
@@ -458,32 +458,32 @@ cache_clear(St) ->
true = ets:delete_all_objects(?ATIMES),
St#st{cur_size=0}.
-maybe_spawn_shard_writer(DbName, Shards) ->
+maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) ->
case ets:member(?OPENERS, DbName) of
true ->
ignore;
false ->
- spawn_shard_writer(DbName, Shards)
+ spawn_shard_writer(DbName, Shards, IdleTimeout)
end.
-spawn_shard_writer(DbName, Shards) ->
- erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+spawn_shard_writer(DbName, Shards, IdleTimeout) ->
+ erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end).
-shard_writer(DbName, Shards) ->
+shard_writer(DbName, Shards, IdleTimeout) ->
try
receive
write ->
true = ets:insert(?SHARDS, Shards);
cancel ->
ok
- after ?WRITE_IDLE_TIMEOUT ->
+ after IdleTimeout ->
ok
end
after
true = ets:delete(?OPENERS, {DbName, self()})
end.
-flush_write(DbName, Writer) ->
+flush_write(DbName, Writer, WriteTimeout) ->
Ref = erlang:monitor(process, Writer),
Writer ! write,
receive
@@ -491,7 +491,7 @@ flush_write(DbName, Writer) ->
ok;
{'DOWN', Ref, _, _, Error} ->
erlang:exit({mem3_shards_bad_write, Error})
- after ?WRITE_TIMEOUT ->
+ after WriteTimeout ->
erlang:exit({mem3_shards_write_timeout, DbName})
end.
@@ -506,3 +506,201 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
filter_shards_by_name(Name, [S|Matches], Ss);
filter_shards_by_name(Name, Matches, [_|Ss]) ->
filter_shards_by_name(Name, Matches, Ss).
+
+
+% Export all functions for unit tests
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"eunit_db_name">>).
+-define(INFINITY, 99999999).
+
+
+mem3_shards_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_maybe_spawn_shard_writer_already_exists(),
+ t_maybe_spawn_shard_writer_new(),
+ t_flush_writer_exists_normal(),
+ t_flush_writer_times_out(),
+ t_flush_writer_crashes(),
+ t_writer_deletes_itself_when_done(),
+ t_writer_does_not_delete_other_writers_for_same_shard(),
+ t_spawn_writer_in_load_shards_from_db(),
+ t_cache_insert_takes_new_update(),
+ t_cache_insert_ignores_stale_update_and_kills_worker()
+ ]
+ }.
+
+
+setup() ->
+ ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]),
+ ets:new(?OPENERS, [bag, public, named_table]),
+ ets:new(?DBS, [set, public, named_table]),
+ ets:new(?ATIMES, [ordered_set, public, named_table]),
+ meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+ ok.
+
+
+teardown(_) ->
+ meck:unload(),
+ ets:delete(?ATIMES),
+ ets:delete(?DBS),
+ ets:delete(?OPENERS),
+ ets:delete(?SHARDS).
+
+
+t_maybe_spawn_shard_writer_already_exists() ->
+ ?_test(begin
+ ets:insert(?OPENERS, {?DB, self()}),
+ Shards = mock_shards(),
+ WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY),
+ ?assertEqual(ignore, WRes)
+ end).
+
+
+t_maybe_spawn_shard_writer_new() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = maybe_spawn_shard_writer(?DB, Shards, 1000),
+ WRef = erlang:monitor(process, WPid),
+ ?assert(is_pid(WPid)),
+ ?assert(is_process_alive(WPid)),
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(mock_shards(), ets:tab2list(?SHARDS))
+ end).
+
+
+t_flush_writer_exists_normal() ->
+ ?_test(begin
+ WPid = spawn_link_mock_writer(?INFINITY),
+ ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)),
+ ?assertEqual(mock_shards(), ets:tab2list(?SHARDS))
+ end).
+
+
+t_flush_writer_times_out() ->
+ ?_test(begin
+ WPid = spawn(fun() -> receive will_never_receive_this -> ok end end),
+ Error = {mem3_shards_write_timeout, ?DB},
+ ?assertExit(Error, flush_write(?DB, WPid, 100)),
+ exit(WPid, kill)
+ end).
+
+
+t_flush_writer_crashes() ->
+ ?_test(begin
+ WPid = spawn(fun() -> receive write -> exit('kapow!') end end),
+ Error = {mem3_shards_bad_write, 'kapow!'},
+ ?assertExit(Error, flush_write(?DB, WPid, 1000))
+ end).
+
+
+t_writer_deletes_itself_when_done() ->
+ ?_test(begin
+ WPid = spawn_link_mock_writer(?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ ets:insert(?OPENERS, {?DB, WPid}),
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(mock_shards(), ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+t_writer_does_not_delete_other_writers_for_same_shard() ->
+ ?_test(begin
+ WPid = spawn_link_mock_writer(?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ ets:insert(?OPENERS, {?DB, WPid}),
+ ets:insert(?OPENERS, {?DB, self()}), % should not be deleted
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(mock_shards(), ets:tab2list(?SHARDS)),
+ ?assertEqual(1, ets:info(?OPENERS, size)),
+ ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS))
+ end).
+
+
+t_spawn_writer_in_load_shards_from_db() ->
+ ?_test(begin
+ meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}),
+ meck:expect(couch_db, get_update_seq, 1, 1),
+ meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
+ erlang:register(?MODULE, self()), % register to get cache_insert cast
+ load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
+ meck:validate(couch_db),
+ meck:validate(mem3_util),
+ Cast = receive
+ {'$gen_cast', Msg} -> Msg
+ after 1000 ->
+ timeout
+ end,
+ ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast),
+ {cache_insert, _, WPid, _} = Cast,
+ exit(WPid, kill),
+ ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS))
+ end).
+
+
+t_cache_insert_takes_new_update() ->
+ ?_test(begin
+ WPid = spawn_link_mock_writer(?INFINITY),
+ Msg = {cache_insert, ?DB, WPid, 2},
+ {noreply, NewState} = handle_cast(Msg, mock_state(1)),
+ ?assertMatch(#st{cur_size = 1}, NewState),
+ ?assertEqual(mock_shards(), ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+t_cache_insert_ignores_stale_update_and_kills_worker() ->
+ ?_test(begin
+ WPid = spawn_link_mock_writer(?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ Msg = {cache_insert, ?DB, WPid, 1},
+ {noreply, NewState} = handle_cast(Msg, mock_state(2)),
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertMatch(#st{cur_size = 0}, NewState),
+ ?assertEqual([], ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+mock_state(UpdateSeq) ->
+ #st{changes_pid = self(), update_seq = UpdateSeq}.
+
+
+mock_shards() ->
+ [
+ #ordered_shard{
+ name = <<"testshardname">>,
+ node = node(),
+ dbname = ?DB,
+ range = [0,1],
+ order = 1
+ }
+ ].
+
+
+wait_writer_result(WRef) ->
+ receive
+ {'DOWN', WRef, _, _, Result} ->
+ Result
+ after 1000 ->
+ timeout
+ end.
+
+
+spawn_link_mock_writer(Timeout) ->
+ Shards = mock_shards(),
+ erlang:spawn_link(fun() -> shard_writer(?DB, Shards, Timeout) end).
+
+
+-endif.
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.