You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/04/18 15:54:43 UTC

[couchdb] branch COUCHDB-3376-fix-mem3-shards updated (a9b2ef2 -> 3746e3c)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git.

      from  a9b2ef2   Use a temporary process when caching shard maps
       new  6bd2d63   Add unit tests for mem3_shards race condition and writer process
       new  3746e3c   Correctly delete writer information from ?OPENERS in mem3_shards

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "adds" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/mem3/src/mem3_shards.erl | 222 ++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 210 insertions(+), 12 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].

[couchdb] 02/02: Correctly delete writer information from ?OPENERS in mem3_shards

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3746e3c44a905f06f85d77930015a62f4a52e00c
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Apr 18 01:16:21 2017 -0400

    Correctly delete writer information from ?OPENERS in mem3_shards
    
    `?OPENERS` is an ETS table of type bag. To delete one specific object have to
    use `ets:delete_object(Tab, Object)`
    
    Without this fix writers were never cleaned up and no new writers could be
    spawned.
    
    COUCHDB-3376
---
 src/mem3/src/mem3_shards.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index af9417a..5cdbc6c 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -480,7 +480,7 @@ shard_writer(DbName, Shards, IdleTimeout) ->
             ok
         end
     after
-        true = ets:delete(?OPENERS, {DbName, self()})
+        true = ets:delete_object(?OPENERS, {DbName, self()})
     end.
 
 flush_write(DbName, Writer, WriteTimeout) ->

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 01/02: Add unit tests for mem3_shards race condition and writer process

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3376-fix-mem3-shards
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6bd2d63f101c2a5f4bc0587e3809cdabd1a00693
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Apr 18 01:14:20 2017 -0400

    Add unit tests for mem3_shards race condition and writer process
    
    COUCHDB-3376
---
 src/mem3/src/mem3_shards.erl | 220 ++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 209 insertions(+), 11 deletions(-)

diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 7336668..af9417a 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -334,7 +334,7 @@ changes_callback({change, {Change}, _}, _) ->
                     [DbName, Reason]);
             {Doc} ->
                 Shards = mem3_util:build_ordered_shards(DbName, Doc),
-                Writer = spawn_shard_writer(DbName, Shards),
+                Writer = spawn_shard_writer(DbName, Shards, ?WRITE_IDLE_TIMEOUT),
                 ets:insert(?OPENERS, {DbName, Writer}),
                 Msg = {cache_insert_change, DbName, Writer, Seq},
                 gen_server:cast(?MODULE, Msg),
@@ -362,7 +362,7 @@ load_shards_from_db(#db{} = ShardDb, DbName) ->
     {ok, #doc{body = {Props}}} ->
         Seq = couch_db:get_update_seq(ShardDb),
         Shards = mem3_util:build_ordered_shards(DbName, Props),
-        case maybe_spawn_shard_writer(DbName, Shards) of
+        case maybe_spawn_shard_writer(DbName, Shards, ?WRITE_IDLE_TIMEOUT) of
             Writer when is_pid(Writer) ->
                 case ets:insert_new(?OPENERS, {DbName, Writer}) of
                     true ->
@@ -407,7 +407,7 @@ create_if_missing(Name) ->
 cache_insert(#st{cur_size=Cur}=St, DbName, Writer) ->
     NewATime = now(),
     true = ets:delete(?SHARDS, DbName),
-    flush_write(DbName, Writer),
+    flush_write(DbName, Writer, ?WRITE_TIMEOUT),
     case ets:lookup(?DBS, DbName) of
         [{DbName, ATime}] ->
             true = ets:delete(?ATIMES, ATime),
@@ -458,32 +458,32 @@ cache_clear(St) ->
     true = ets:delete_all_objects(?ATIMES),
     St#st{cur_size=0}.
 
-maybe_spawn_shard_writer(DbName, Shards) ->
+maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) ->
     case ets:member(?OPENERS, DbName) of
         true ->
             ignore;
         false ->
-            spawn_shard_writer(DbName, Shards)
+            spawn_shard_writer(DbName, Shards, IdleTimeout)
     end.
 
-spawn_shard_writer(DbName, Shards) ->
-    erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+spawn_shard_writer(DbName, Shards, IdleTimeout) ->
+    erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end).
 
-shard_writer(DbName, Shards) ->
+shard_writer(DbName, Shards, IdleTimeout) ->
     try
         receive
             write ->
                 true = ets:insert(?SHARDS, Shards);
             cancel ->
                 ok
-        after ?WRITE_IDLE_TIMEOUT ->
+        after IdleTimeout ->
             ok
         end
     after
         true = ets:delete(?OPENERS, {DbName, self()})
     end.
 
-flush_write(DbName, Writer) ->
+flush_write(DbName, Writer, WriteTimeout) ->
     Ref = erlang:monitor(process, Writer),
     Writer ! write,
     receive
@@ -491,7 +491,7 @@ flush_write(DbName, Writer) ->
             ok;
         {'DOWN', Ref, _, _, Error} ->
             erlang:exit({mem3_shards_bad_write, Error})
-    after ?WRITE_TIMEOUT ->
+    after WriteTimeout ->
         erlang:exit({mem3_shards_write_timeout, DbName})
     end.
 
@@ -506,3 +506,201 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
     filter_shards_by_name(Name, [S|Matches], Ss);
 filter_shards_by_name(Name, Matches, [_|Ss]) ->
     filter_shards_by_name(Name, Matches, Ss).
+
+
+% Export all functions for unit tests
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"eunit_db_name">>).
+-define(INFINITY, 99999999).
+
+
+mem3_shards_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_maybe_spawn_shard_writer_already_exists(),
+            t_maybe_spawn_shard_writer_new(),
+            t_flush_writer_exists_normal(),
+            t_flush_writer_times_out(),
+            t_flush_writer_crashes(),
+            t_writer_deletes_itself_when_done(),
+            t_writer_does_not_delete_other_writers_for_same_shard(),
+            t_spawn_writer_in_load_shards_from_db(),
+            t_cache_insert_takes_new_update(),
+            t_cache_insert_ignores_stale_update_and_kills_worker()
+        ]
+    }.
+
+
+setup() ->
+    ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]),
+    ets:new(?OPENERS, [bag, public, named_table]),
+    ets:new(?DBS, [set, public, named_table]),
+    ets:new(?ATIMES, [ordered_set, public, named_table]),
+    meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+    ok.
+
+
+teardown(_) ->
+    meck:unload(),
+    ets:delete(?ATIMES),
+    ets:delete(?DBS),
+    ets:delete(?OPENERS),
+    ets:delete(?SHARDS).
+
+
+t_maybe_spawn_shard_writer_already_exists() ->
+    ?_test(begin
+        ets:insert(?OPENERS, {?DB, self()}),
+        Shards = mock_shards(),
+        WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY),
+        ?assertEqual(ignore, WRes)
+    end).
+
+
+t_maybe_spawn_shard_writer_new() ->
+    ?_test(begin
+        Shards = mock_shards(),
+        WPid = maybe_spawn_shard_writer(?DB, Shards, 1000),
+        WRef = erlang:monitor(process, WPid),
+        ?assert(is_pid(WPid)),
+        ?assert(is_process_alive(WPid)),
+        WPid ! write,
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertEqual(mock_shards(), ets:tab2list(?SHARDS))
+    end).
+
+
+t_flush_writer_exists_normal() ->
+    ?_test(begin
+        WPid = spawn_link_mock_writer(?INFINITY),
+        ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)),
+        ?assertEqual(mock_shards(), ets:tab2list(?SHARDS))
+    end).
+
+
+t_flush_writer_times_out() ->
+    ?_test(begin
+        WPid = spawn(fun() -> receive will_never_receive_this -> ok end end),
+        Error = {mem3_shards_write_timeout, ?DB},
+        ?assertExit(Error, flush_write(?DB, WPid, 100)),
+        exit(WPid, kill)
+    end).
+
+
+t_flush_writer_crashes() ->
+    ?_test(begin
+        WPid = spawn(fun() -> receive write -> exit('kapow!') end end),
+        Error = {mem3_shards_bad_write, 'kapow!'},
+        ?assertExit(Error, flush_write(?DB, WPid, 1000))
+    end).
+
+
+t_writer_deletes_itself_when_done() ->
+    ?_test(begin
+        WPid = spawn_link_mock_writer(?INFINITY),
+        WRef = erlang:monitor(process, WPid),
+        ets:insert(?OPENERS, {?DB, WPid}),
+        WPid ! write,
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertEqual(mock_shards(), ets:tab2list(?SHARDS)),
+        ?assertEqual([], ets:tab2list(?OPENERS))
+    end).
+
+
+t_writer_does_not_delete_other_writers_for_same_shard() ->
+    ?_test(begin
+        WPid = spawn_link_mock_writer(?INFINITY),
+        WRef = erlang:monitor(process, WPid),
+        ets:insert(?OPENERS, {?DB, WPid}),
+        ets:insert(?OPENERS, {?DB, self()}),  % should not be deleted
+        WPid ! write,
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertEqual(mock_shards(), ets:tab2list(?SHARDS)),
+        ?assertEqual(1, ets:info(?OPENERS, size)),
+        ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS))
+    end).
+
+
+t_spawn_writer_in_load_shards_from_db() ->
+    ?_test(begin
+        meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}),
+        meck:expect(couch_db, get_update_seq, 1, 1),
+        meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
+        erlang:register(?MODULE, self()), % register to get cache_insert cast
+        load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
+        meck:validate(couch_db),
+        meck:validate(mem3_util),
+        Cast = receive
+                {'$gen_cast', Msg} -> Msg
+            after 1000 ->
+                timeout
+        end,
+        ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast),
+        {cache_insert, _, WPid, _} = Cast,
+        exit(WPid, kill),
+        ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS))
+    end).
+
+
+t_cache_insert_takes_new_update() ->
+    ?_test(begin
+        WPid = spawn_link_mock_writer(?INFINITY),
+        Msg = {cache_insert, ?DB, WPid, 2},
+        {noreply, NewState} = handle_cast(Msg, mock_state(1)),
+        ?assertMatch(#st{cur_size = 1}, NewState),
+        ?assertEqual(mock_shards(), ets:tab2list(?SHARDS)),
+        ?assertEqual([], ets:tab2list(?OPENERS))
+    end).
+
+
+t_cache_insert_ignores_stale_update_and_kills_worker() ->
+    ?_test(begin
+        WPid = spawn_link_mock_writer(?INFINITY),
+        WRef = erlang:monitor(process, WPid),
+        Msg = {cache_insert, ?DB, WPid, 1},
+        {noreply, NewState} = handle_cast(Msg, mock_state(2)),
+        ?assertEqual(normal, wait_writer_result(WRef)),
+        ?assertMatch(#st{cur_size = 0}, NewState),
+        ?assertEqual([], ets:tab2list(?SHARDS)),
+        ?assertEqual([], ets:tab2list(?OPENERS))
+    end).
+
+
+mock_state(UpdateSeq) ->
+    #st{changes_pid = self(), update_seq = UpdateSeq}.
+
+
+mock_shards() ->
+    [
+        #ordered_shard{
+            name = <<"testshardname">>,
+            node = node(),
+            dbname = ?DB,
+            range = [0,1],
+            order = 1
+        }
+    ].
+
+
+wait_writer_result(WRef) ->
+    receive
+        {'DOWN', WRef, _, _, Result} ->
+            Result
+        after 1000 ->
+            timeout
+    end.
+
+
+spawn_link_mock_writer(Timeout) ->
+    Shards = mock_shards(),
+    erlang:spawn_link(fun() -> shard_writer(?DB, Shards, Timeout) end).
+
+
+-endif.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.