You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/01/04 22:15:07 UTC

[couchdb] 01/01: Lock shard splitting targets during the initial copy phase

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch add-locking-for-shard-splitting-targets
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 442280dcc0a72c47dc84584793ce243ebb860785
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Sat Jan 4 17:08:05 2020 -0500

    Lock shard splitting targets during the initial copy phase
    
    During the initial copy phase target shards are opened outside the
    couch_server. Previously, it was possible to manually (via remsh for instance)
    open the same targets via the couch_server by using the `couch_db:open/2` API
    for example. That could lead to corruption as there would be two writers for
    the same DB file.
    
    In order to prevent such a scenario, introduce a mechanism for the shard
    splitter to lock the target shards such that any regular open call would fail
    during the initial copy phase.
    
    The locking mechinism is generic and would allow local locking of shards for
    possibly other reasons in the future as well.
---
 src/couch/src/couch_db_split.erl              | 11 +++++++-
 src/couch/src/couch_server.erl                | 39 +++++++++++++++++++++++++--
 src/couch/test/eunit/couch_db_split_tests.erl | 35 +++++++++++++++++++++---
 src/couch/test/eunit/couch_db_tests.erl       | 29 +++++++++++++++++++-
 src/mem3/test/eunit/mem3_reshard_test.erl     | 35 +++++++++++++++++++++++-
 5 files changed, 141 insertions(+), 8 deletions(-)

diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index 5bf98b6..3a1f98d 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -132,6 +132,12 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
             {error, E} ->
                 throw({target_create_error, DbName, E, Map})
         end,
+        case couch_server:lock(DbName, <<"shard splitting">>) of
+            ok ->
+                ok;
+            {error, Err} ->
+                throw({target_create_error, DbName, Err, Map})
+        end,
         {ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
         Opts = [create, ?ADMIN_CTX] ++ case Partitioned of
             true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}];
@@ -164,7 +170,9 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
 cleanup_targets(#{} = Targets, Engine) ->
     maps:map(fun(_, #target{db = Db} = T) ->
         ok = stop_target_db(Db),
-        delete_target(couch_db:name(Db), Engine),
+        DbName = couch_db:name(Db),
+        delete_target(DbName, Engine),
+        couch_server:unlock(DbName),
         T
     end, Targets).
 
@@ -182,6 +190,7 @@ stop_target_db(Db) ->
     Pid = couch_db:get_pid(Db),
     catch unlink(Pid),
     catch exit(Pid, kill),
+    couch_server:unlock(couch_db:name(Db)),
     ok.
 
 
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index ab0122e..eaca3ee 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -26,6 +26,7 @@
 -export([exists/1]).
 -export([get_engine_extensions/0]).
 -export([get_engine_path/2]).
+-export([lock/2, unlock/1]).
 
 % config_listener api
 -export([handle_config_change/5, handle_config_terminate/3]).
@@ -77,8 +78,15 @@ get_stats() ->
 sup_start_link() ->
     gen_server:start_link({local, couch_server}, couch_server, [], []).
 
+open(DbName, Options) ->
+    case ets:lookup(couch_dbs_locks, DbName) of
+        [] ->
+            open_int(DbName, Options);
+        [{DbName, Reason}] ->
+            {error, {locked, Reason}}
+    end.
 
-open(DbName, Options0) ->
+open_int(DbName, Options0) ->
     Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
     case ets:lookup(couch_dbs, DbName) of
     [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
@@ -115,7 +123,15 @@ update_lru(DbName, Options) ->
 close_lru() ->
     gen_server:call(couch_server, close_lru).
 
-create(DbName, Options0) ->
+create(DbName, Options) ->
+    case ets:lookup(couch_dbs_locks, DbName) of
+        [] ->
+            create_int(DbName, Options);
+        [{DbName, Reason}] ->
+            {error, {locked, Reason}}
+    end.
+
+create_int(DbName, Options0) ->
     Options = maybe_add_sys_db_callbacks(DbName, Options0),
     couch_partition:validate_dbname(DbName, Options),
     case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
@@ -251,6 +267,12 @@ init([]) ->
         {read_concurrency, true}
     ]),
     ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
+    ets:new(couch_dbs_locks, [
+        set,
+        public,
+        named_table,
+        {read_concurrency, true}
+    ]),
     process_flag(trap_exit, true),
     {ok, #server{root_dir=RootDir,
                 engines = Engines,
@@ -774,6 +796,19 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
             {error, {invalid_engine, Engine}}
     end.
 
+lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
+    case ets:lookup(couch_dbs, DbName) of
+        [] ->
+            true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+            ok;
+        [#entry{}] ->
+            {error, already_opened}
+    end.
+
+unlock(DbName) when is_binary(DbName) ->
+    true = ets:delete(couch_dbs_locks, DbName),
+    ok.
+
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl
index 8e64c39..6e24c36 100644
--- a/src/couch/test/eunit/couch_db_split_tests.erl
+++ b/src/couch/test/eunit/couch_db_split_tests.erl
@@ -56,7 +56,8 @@ split_test_() ->
                     fun should_fail_on_missing_source/1,
                     fun should_fail_on_existing_target/1,
                     fun should_fail_on_invalid_target_name/1,
-                    fun should_crash_on_invalid_tmap/1
+                    fun should_crash_on_invalid_tmap/1,
+                    fun should_fail_on_opened_target/1
                 ]
             }
         ]
@@ -104,9 +105,23 @@ should_fail_on_missing_source(_DbName) ->
 
 should_fail_on_existing_target(DbName) ->
     Ranges = make_ranges(2),
-    TMap = maps:map(fun(_, _) -> DbName end, make_targets(Ranges)),
+    TMap = maps:map(fun(_, TName) ->
+        % We create the target but make sure to remove it from the cache so we
+        % hit the eexist error instaed of already_opened
+        {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]),
+        Pid = couch_db:get_pid(Db),
+        ok = couch_db:close(Db),
+        exit(Pid, kill),
+        test_util:wait(fun() ->
+            case ets:lookup(couch_dbs, TName) of
+                [] -> ok;
+                [_ | _] -> wait
+            end
+        end),
+        TName
+    end, make_targets(Ranges)),
     Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3),
-    ?_assertMatch({error, {target_create_error, DbName, eexist}}, Response).
+    ?_assertMatch({error, {target_create_error, _, eexist}}, Response).
 
 
 should_fail_on_invalid_target_name(DbName) ->
@@ -127,6 +142,20 @@ should_crash_on_invalid_tmap(DbName) ->
         couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
 
 
+should_fail_on_opened_target(DbName) ->
+    Ranges = make_ranges(2),
+    TMap = maps:map(fun(_, TName) ->
+        % We create and keep the target open but delete
+        % its file on disk so we don't fail with eexist
+        {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]),
+        FilePath = couch_db:get_filepath(Db),
+        ok = file:delete(FilePath),
+        TName
+    end, make_targets(Ranges)),
+    ?_assertMatch({error, {target_create_error, _, already_opened}},
+         couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
+
+
 copy_local_docs_test_() ->
     Cases = [
         {"Should work with no docs", 0, 2},
diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl
index d64f7c6..dd2cb42 100644
--- a/src/couch/test/eunit/couch_db_tests.erl
+++ b/src/couch/test/eunit/couch_db_tests.erl
@@ -80,7 +80,8 @@ open_db_test_()->
                 fun() -> ?tempdb() end,
                 [
                     fun should_create_db_if_missing/1,
-                    fun should_open_db_if_exists/1
+                    fun should_open_db_if_exists/1,
+                    fun locking_should_work/1
                 ]
             }
         }
@@ -157,6 +158,32 @@ should_open_db_if_exists(DbName) ->
         ?assert(lists:member(DbName, After))
     end).
 
+locking_should_work(DbName) ->
+    ?_test(begin
+        ?assertEqual(ok, couch_server:lock(DbName, <<"x">>)),
+        ?assertEqual({error, {locked, <<"x">>}}, couch_db:create(DbName, [])),
+        ?assertEqual(ok, couch_server:unlock(DbName)),
+        {ok, Db} = couch_db:create(DbName, []),
+        ?assertEqual({error, already_opened},
+            couch_server:lock(DbName, <<>>)),
+
+        ok = couch_db:close(Db),
+        catch exit(couch_db:get_pid(Db), kill),
+        test_util:wait(fun() ->
+            case ets:lookup(couch_dbs, DbName) of
+                [] -> ok;
+                [_ | _] -> wait
+            end
+         end),
+
+        ?assertEqual(ok, couch_server:lock(DbName, <<"y">>)),
+        ?assertEqual({error, {locked, <<"y">>}},
+            couch_db:open(DbName, [])),
+
+        couch_server:unlock(DbName),
+        {ok, Db1} = couch_db:open(DbName, [{create_if_missing, true}]),
+        ok = couch_db:close(Db1)
+    end).
 
 create_db(DbName) ->
     create_db(DbName, []).
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index ab62021..1e89755 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -72,7 +72,8 @@ mem3_reshard_db_test_() ->
                     fun couch_events_are_emitted/1,
                     fun retries_work/1,
                     fun target_reset_in_initial_copy/1,
-                    fun split_an_incomplete_shard_map/1
+                    fun split_an_incomplete_shard_map/1,
+                    fun target_shards_are_locked/1
                 ]
             }
         }
@@ -479,6 +480,38 @@ split_an_incomplete_shard_map(#{db1 := Db}) ->
     end)}.
 
 
+% Opening a db target db in initial copy phase will throw an error
+target_shards_are_locked(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT, ?_test(begin
+        add_test_docs(Db, #{docs => 10}),
+
+        % Make the job stops right when it was about to copy the docs
+        TestPid = self(),
+        meck:new(couch_db, [passthrough]),
+        meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) ->
+            TestPid ! {start_link, self(), TName},
+            receive
+                continue ->
+                    meck:passthrough([Engine, TName, FilePath, Opts])
+            end
+        end),
+
+        [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+        {ok, JobId} = mem3_reshard:start_split_job(Shard),
+        {Target0, JobPid} = receive
+            {start_link, Pid, TName} -> {TName, Pid}
+        end,
+        ?assertEqual({error, {locked, <<"shard splitting">>}},
+            couch_db:open_int(Target0, [])),
+
+        % Send two continues for two targets
+        JobPid ! continue,
+        JobPid ! continue,
+
+        wait_state(JobId, completed)
+    end)}.
+
+
 intercept_state(State) ->
     TestPid = self(),
     meck:new(mem3_reshard_job, [passthrough]),