You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:26:36 UTC
[41/50] couch-replicator commit: updated refs/heads/63012-scheduler
to 27a5eae
Use mem3 in couch_multidb_changes to discover _replicator shards
This is a forward-port of a corresponding commit in master:
"Use mem3 to discover all _replicator shards in replicator manager"
https://github.com/apache/couchdb-couch-replicator/commit/b281d2bb320ed6e6d8226765315a40637ba91a46
This wasn't a direct merge as replicator shard discovery and traversal is slightly
different.
`couch_multidb_changes` is more generic and takes a db suffix and callback
module. So `<<"_replicator">>` is not hard-coded in multidb changes module.
`couch_replicator_manager` handles local `_replicator` db by directly
creating it and launching a changes feed for it. In the scheduling replicator
creation is separate from monitoring. The logic is handled in `scan_all_dbs`
function where first thing it always checks if there is a local db present
matching the suffix, if so a `{resume_scan, DbName}` is sent to main process.
Due to supervisor order by the time that code runs a local replicator db
will be created already.
COUCHDB-3277
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/fb77cbc4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/fb77cbc4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/fb77cbc4
Branch: refs/heads/63012-scheduler
Commit: fb77cbc463caa573a51f971243a5cb18ee8b2e9a
Parents: c813d0e
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Jan 25 01:28:18 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Jan 25 01:28:18 2017 -0500
----------------------------------------------------------------------
src/couch_multidb_changes.erl | 169 ++++++++++++++++++++++++-------------
1 file changed, 111 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/fb77cbc4/src/couch_multidb_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_multidb_changes.erl b/src/couch_multidb_changes.erl
index b9c9ad5..af85a78 100644
--- a/src/couch_multidb_changes.erl
+++ b/src/couch_multidb_changes.erl
@@ -21,6 +21,7 @@
-export([changes_reader/3, changes_reader_cb/3]).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
@@ -249,34 +250,44 @@ changes_reader_cb(_, _, Acc) ->
scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
- Root = config:get("couchdb", "database_dir", "."),
- NormRoot = couch_util:normpath(Root),
- Pat = io_lib:format("~s(\\.[0-9]{10,})?.couch$", [DbSuffix]),
- filelib:fold_files(Root, lists:flatten(Pat), true,
- fun(Filename, Acc) ->
- % shamelessly stolen from couch_server.erl
- NormFilename = couch_util:normpath(Filename),
- case NormFilename -- NormRoot of
- [$/ | RelativeFilename] -> ok;
- RelativeFilename -> ok
- end,
- DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
- Jitter = jitter(Acc),
- spawn_link(fun() ->
- timer:sleep(Jitter),
- gen_server:cast(Server, {resume_scan, DbName})
- end),
- Acc + 1
- end, 1).
+ ok = scan_local_db(Server, DbSuffix),
+ {ok, Db} = mem3_util:ensure_exists(
+ config:get("mem3", "shards_db", "_dbs")),
+ ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
+ ChangesFun(fun({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+ case couch_replicator_utils:is_deleted(Change) of
+ true ->
+ ok;
+ false ->
+ [gen_server:cast(Server, {resume_scan, ShardName})
+ || ShardName <- filter_shards(DbName, DbSuffix)],
+ ok
+ end
+ end;
+ (_, _) -> ok
+ end),
+ couch_db:close(Db).
+
+
+filter_shards(DbName, DbSuffix) ->
+ case DbSuffix =:= couch_db:dbname_suffix(DbName) of
+ false ->
+ [];
+ true ->
+ [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+ end.
-% calculate random delay proportional to the number of replications
-% on current node, in order to prevent a stampede:
-% - when a source with multiple replication targets fails
-% - when we restart couch_replication_manager
-jitter(N) ->
- Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
- random:uniform(Range).
+scan_local_db(Server, DbSuffix) when is_pid(Server) ->
+ case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
+ {ok, Db} ->
+ gen_server:cast(Server, {resume_scan, DbSuffix}),
+ ok = couch_db:close(Db);
+ _Error ->
+ ok
+ end.
is_design_doc({Change}) ->
@@ -336,7 +347,6 @@ couch_multidb_changes_test_() ->
t_handle_call_resume_scan_no_chfeed_ets_entry(),
t_start_link(),
t_start_link_no_ddocs(),
- t_scanner_finds_shard(),
t_misc_gen_server_callbacks()
]
}.
@@ -345,7 +355,15 @@ setup() ->
mock_logs(),
mock_callback_mod(),
meck:expect(couch_event, register_all, 1, ok),
- meck:expect(config, get, fun("couchdb", "database_dir", _) -> ?TEMPDIR end),
+ meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+ meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
+ ChangesFun = meck:val(fun(_) -> ok end),
+ meck:expect(couch_changes, handle_changes, 4, ChangesFun),
+ meck:expect(couch_db, open_int,
+ fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
+ (_, _) -> {not_found, no_db_file}
+ end),
+ meck:expect(couch_db, close, 1, ok),
mock_changes_reader(),
% create process to stand in for couch_ever_server
% mocking erlang:monitor doesn't, so give it real process to monitor
@@ -357,8 +375,7 @@ setup() ->
teardown(EvtPid) ->
unlink(EvtPid),
exit(EvtPid, kill),
- meck:unload(),
- delete_shard_file(?DBNAME).
+ meck:unload().
t_handle_call_change() ->
@@ -648,15 +665,6 @@ t_start_link_no_ddocs() ->
exit(Pid, kill)
end).
-t_scanner_finds_shard() ->
- ?_test(begin
- ok = create_shard_file(?DBNAME),
- {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
- ok = meck:wait(?MOD, db_found, [?DBNAME, zig], 2000),
- unlink(Pid),
- exit(Pid, kill)
- end).
-
t_misc_gen_server_callbacks() ->
?_test(begin
?assertEqual(ok, terminate(reason, state)),
@@ -664,6 +672,70 @@ t_misc_gen_server_callbacks() ->
end).
+scan_dbs_test_() ->
+{
+ foreach,
+ fun() -> test_util:start_couch([mem3, fabric]) end,
+ fun(Ctx) -> test_util:stop_couch(Ctx) end,
+ [
+ t_pass_shard(),
+ t_fail_shard(),
+ t_pass_local(),
+ t_fail_local()
+ ]
+}.
+
+
+t_pass_shard() ->
+ ?_test(begin
+ DbName0 = ?tempdb(),
+ DbSuffix = <<"_replicator">>,
+ DbName = <<DbName0/binary, "/", DbSuffix/binary>>,
+ ok = fabric:create_db(DbName, [?CTX]),
+ ?assertEqual(8, length(filter_shards(DbName, DbSuffix))),
+ fabric:delete_db(DbName, [?CTX])
+ end).
+
+
+t_fail_shard() ->
+ ?_test(begin
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, [?CTX]),
+ ?assertEqual([], filter_shards(DbName, <<"_replicator">>)),
+ fabric:delete_db(DbName, [?CTX])
+ end).
+
+
+t_pass_local() ->
+ ?_test(begin
+ LocalDb = ?tempdb(),
+ {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+ ok = couch_db:close(Db),
+ scan_local_db(self(), LocalDb),
+ receive
+ {'$gen_cast', Msg} ->
+ ?assertEqual(Msg, {resume_scan, LocalDb})
+ after 0 ->
+ ?assert(false)
+ end
+ end).
+
+
+t_fail_local() ->
+ ?_test(begin
+ LocalDb = ?tempdb(),
+ {ok, Db} = couch_db:create(LocalDb, [?CTX]),
+ ok = couch_db:close(Db),
+ scan_local_db(self(), <<"some_other_db">>),
+ receive
+ {'$gen_cast', Msg} ->
+ ?assertNotEqual(Msg, {resume_scan, LocalDb})
+ after 0 ->
+ ?assert(true)
+ end
+ end).
+
+
% Test helper functions
mock_logs() ->
@@ -699,7 +771,6 @@ kill_mock_change_reader_and_get_its_args(Pid) ->
end.
mock_changes_reader() ->
- meck:expect(couch_db, open_int, 2, {ok, db}),
meck:expect(couch_changes, handle_db_changes,
fun(_ChArgs, _Req, db) ->
fun mock_changes_reader_loop/1
@@ -734,24 +805,6 @@ change_row(Id) when is_binary(Id) ->
{doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
]}.
-shard_fname(DbName) ->
- filename:join([?TEMPDIR, binary_to_list(DbName) ++ ".couch"]).
-
-delete_shard_file(DbName) ->
- Path = shard_fname(DbName),
- case filelib:is_file(Path) of
- true ->
- ok = file:delete(Path);
- false ->
- ok
- end.
-
-create_shard_file(DbName) ->
- Path = shard_fname(DbName),
- ok = filelib:ensure_dir(Path),
- ok = file:write_file(Path, <<>>).
-
-
handle_call_ok(Msg, State) ->
?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).