You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/04/21 16:35:11 UTC

[couchdb] 01/01: Add unit tests for mem3_shards

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 32e5006f8d7e08579dbe9fab0e7e7ac6d78d7d40
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Apr 18 01:14:20 2017 -0400

    Add unit tests for mem3_shards
    
    COUCHDB-3376
---
 src/mem3/src/mem3_shards.erl | 234 +++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 226 insertions(+), 8 deletions(-)

diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index bbdc3b5..38440ae 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -476,32 +476,32 @@ cache_clear(St) ->
     true = ets:delete_all_objects(?ATIMES),
     St#st{cur_size=0}.
 
-maybe_spawn_shard_writer(DbName, Shards) ->
+maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) ->
     case ets:member(?OPENERS, DbName) of
         true ->
             ignore;
         false ->
-            spawn_shard_writer(DbName, Shards)
+            spawn_shard_writer(DbName, Shards, IdleTimeout)
     end.
 
-spawn_shard_writer(DbName, Shards) ->
-    erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+spawn_shard_writer(DbName, Shards, IdleTimeout) ->
+    erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end).
 
-shard_writer(DbName, Shards) ->
+shard_writer(DbName, Shards, IdleTimeout) ->
     try
         receive
             write ->
                 true = ets:insert(?SHARDS, Shards);
             cancel ->
                 ok
-        after ?WRITE_IDLE_TIMEOUT ->
+        after IdleTimeout ->
             ok
         end
     after
         true = ets:delete_object(?OPENERS, {DbName, self()})
     end.
 
-flush_write(DbName, Writer) ->
+flush_write(DbName, Writer, WriteTimeout) ->
     Ref = erlang:monitor(process, Writer),
     Writer ! write,
     receive
@@ -509,7 +509,7 @@ flush_write(DbName, Writer) ->
             ok;
         {'DOWN', Ref, _, _, Error} ->
             erlang:exit({mem3_shards_bad_write, Error})
-    after ?WRITE_TIMEOUT ->
+    after WriteTimeout ->
         erlang:exit({mem3_shards_write_timeout, DbName})
     end.
 
@@ -524,3 +524,221 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
     filter_shards_by_name(Name, [S|Matches], Ss);
 filter_shards_by_name(Name, Matches, [_|Ss]) ->
     filter_shards_by_name(Name, Matches, Ss).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"eunit_db_name">>).
+-define(INFINITY, 99999999).
+
+
+mem3_shards_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_maybe_spawn_shard_writer_already_exists(),
+            t_maybe_spawn_shard_writer_new(),
+            t_flush_writer_exists_normal(),
+            t_flush_writer_times_out(),
+            t_flush_writer_crashes(),
+            t_writer_deletes_itself_when_done(),
+            t_writer_does_not_delete_other_writers_for_same_shard(),
+            t_spawn_writer_in_load_shards_from_db(),
+            t_cache_insert_takes_new_update(),
+            t_cache_insert_ignores_stale_update_and_kills_worker()
+        ]
+    }.
+
+
+setup() ->
+    ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]),
+    ets:new(?OPENERS, [bag, public, named_table]),
+    ets:new(?DBS, [set, public, named_table]),
+    ets:new(?ATIMES, [ordered_set, public, named_table]),
+    meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+    ok.
+
+
+teardown(_) ->
+    meck:unload(),
+    ets:delete(?ATIMES),
+    ets:delete(?DBS),
+    ets:delete(?OPENERS),
+    ets:delete(?SHARDS).
+
+
+t_maybe_spawn_shard_writer_already_exists() ->
+    ?_test(begin
+        ets:insert(?OPENERS, {?DB, self()}),
+        Shards = mock_shards(),
+        WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY),
+        ?assertEqual(ignore, WRes)
+    end).
+
+
+t_maybe_spawn_shard_writer_new() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = maybe_spawn_shard_writer(?DB, Shards, 1000),
+        WRef = erlang:monitor(process, WPid),
+        ?assert(is_pid(WPid)),
+        ?assert(is_process_alive(WPid)),
+        WPid ! write,
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertEqual(Shards, ets:tab2list(?SHARDS))
+    end).
+
+
+t_flush_writer_exists_normal() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+        ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)),
+        ?assertEqual(Shards, ets:tab2list(?SHARDS))
+    end).
+
+
+t_flush_writer_times_out() ->
+    ?_test(begin
+        WPid = spawn(fun() -> receive will_never_receive_this -> ok end end),
+        Error = {mem3_shards_write_timeout, ?DB},
+        ?assertExit(Error, flush_write(?DB, WPid, 100)),
+        exit(WPid, kill)
+    end).
+
+
+t_flush_writer_crashes() ->
+    ?_test(begin
+        WPid = spawn(fun() -> receive write -> exit('kapow!') end end),
+        Error = {mem3_shards_bad_write, 'kapow!'},
+        ?assertExit(Error, flush_write(?DB, WPid, 1000))
+    end).
+
+
+t_writer_deletes_itself_when_done() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+        WRef = erlang:monitor(process, WPid),
+        ets:insert(?OPENERS, {?DB, WPid}),
+        WPid ! write,
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+        ?assertEqual([], ets:tab2list(?OPENERS))
+    end).
+
+
+t_writer_does_not_delete_other_writers_for_same_shard() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+        WRef = erlang:monitor(process, WPid),
+        ets:insert(?OPENERS, {?DB, WPid}),
+        ets:insert(?OPENERS, {?DB, self()}),  % should not be deleted
+        WPid ! write,
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+        ?assertEqual(1, ets:info(?OPENERS, size)),
+        ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS))
+    end).
+
+
+t_spawn_writer_in_load_shards_from_db() ->
+    ?_test(begin
+        meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}),
+        meck:expect(couch_db, get_update_seq, 1, 1),
+        meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
+        erlang:register(?MODULE, self()), % register to get cache_insert cast
+        load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
+        meck:validate(couch_db),
+        meck:validate(mem3_util),
+        Cast = receive
+                {'$gen_cast', Msg} -> Msg
+            after 1000 ->
+                timeout
+        end,
+        ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast),
+        {cache_insert, _, WPid, _} = Cast,
+        exit(WPid, kill),
+        ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS))
+    end).
+
+
+t_cache_insert_takes_new_update() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+        Msg = {cache_insert, ?DB, WPid, 2},
+        {noreply, NewState} = handle_cast(Msg, mock_state(1)),
+        ?assertMatch(#st{cur_size = 1}, NewState),
+        ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+        ?assertEqual([], ets:tab2list(?OPENERS))
+    end).
+
+
+t_cache_insert_ignores_stale_update_and_kills_worker() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+        WRef = erlang:monitor(process, WPid),
+        Msg = {cache_insert, ?DB, WPid, 1},
+        {noreply, NewState} = handle_cast(Msg, mock_state(2)),
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertMatch(#st{cur_size = 0}, NewState),
+        ?assertEqual([], ets:tab2list(?SHARDS)),
+        ?assertEqual([], ets:tab2list(?OPENERS))
+    end).
+
+
+mock_state(UpdateSeq) ->
+    #st{
+        update_seq = UpdateSeq,
+        changes_pid = self(),
+        write_timeout = 1000
+    }.
+
+
+mock_shards() ->
+    [
+        #ordered_shard{
+            name = <<"testshardname">>,
+            node = node(),
+            dbname = ?DB,
+            range = [0,1],
+            order = 1
+        }
+    ].
+
+
+wait_writer_result(WRef) ->
+    receive
+        {'DOWN', WRef, _, _, Result} ->
+            Result
+        after 1000 ->
+            timeout
+    end.
+
+
+spawn_link_mock_writer(Db, Shards, Timeout) ->
+    erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end).
+
+
+mock_changes_listener() ->
+    Self = self(),
+    erlang:spawn(fun() ->
+        Ref = erlang:monitor(process, Self),
+        mock_changes_listener_loop(Self, Ref)
+    end).
+
+
+mock_changes_listener_loop(Pid, Ref) ->
+    receive
+        {'DOWN', Ref, process, Pid, _} -> ok
+    end.
+
+
+-endif.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.