You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/10 15:00:05 UTC
[couchdb] branch 63012-scheduler updated: [fixup] more style
formatting
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/63012-scheduler by this push:
new e612172 [fixup] more style formatting
e612172 is described below
commit e6121724f60cd5643341f0d97d9565a64a668dc5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Apr 10 10:59:59 2017 -0400
[fixup] more style formatting
---
src/couch_replicator/src/couch_multidb_changes.erl | 41 ++-
src/couch_replicator/src/couch_replicator.erl | 7 +-
.../src/couch_replicator_clustering.erl | 3 -
.../src/couch_replicator_connection.erl | 4 -
.../src/couch_replicator_db_changes.erl | 1 -
.../src/couch_replicator_doc_processor.erl | 10 +-
src/couch_replicator/src/couch_replicator_docs.erl | 5 +-
.../src/couch_replicator_filters.erl | 2 +-
src/couch_replicator/src/couch_replicator_ids.erl | 3 +-
.../src/couch_replicator_manager.erl | 5 +-
.../src/couch_replicator_rate_limiter.erl | 9 +-
.../src/couch_replicator_rate_limiter_tables.erl | 1 -
.../src/couch_replicator_scheduler.erl | 21 +-
.../src/couch_replicator_scheduler_job.erl | 274 +++++++++++----------
.../src/couch_replicator_scheduler_sup.erl | 10 +-
15 files changed, 204 insertions(+), 192 deletions(-)
diff --git a/src/couch_replicator/src/couch_multidb_changes.erl b/src/couch_replicator/src/couch_multidb_changes.erl
index d8e4e05..ba624a4 100644
--- a/src/couch_replicator/src/couch_multidb_changes.erl
+++ b/src/couch_replicator/src/couch_multidb_changes.erl
@@ -14,7 +14,6 @@
-behaviour(gen_server).
-
-export([
start_link/4
]).
@@ -187,13 +186,10 @@ register_with_event_server(Server) ->
-spec db_callback(created | deleted | updated, binary(), #state{}) -> #state{}.
db_callback(created, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
State#state{ctx = Mod:db_created(DbName, Ctx)};
-
db_callback(deleted, DbName, #state{mod = Mod, ctx = Ctx} = State) ->
State#state{ctx = Mod:db_deleted(DbName, Ctx)};
-
db_callback(updated, DbName, State) ->
resume_scan(DbName, State);
-
db_callback(_Other, _DbName, State) ->
State.
@@ -249,11 +245,9 @@ changes_reader(Server, DbName, Since) ->
changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
ok = gen_server:call(Server, {change, DbName, Change}, infinity),
{Server, DbName};
-
changes_reader_cb({stop, EndSeq}, _, {Server, DbName}) ->
ok = gen_server:call(Server, {checkpoint, DbName, EndSeq}, infinity),
{Server, DbName};
-
changes_reader_cb(_, _, Acc) ->
Acc.
@@ -318,7 +312,6 @@ is_design_doc({Change}) ->
is_design_doc_id(<<?DESIGN_DOC_PREFIX, _/binary>>) ->
true;
-
is_design_doc_id(_) ->
false.
@@ -367,6 +360,7 @@ couch_multidb_changes_test_() ->
]
}.
+
setup() ->
mock_logs(),
mock_callback_mod(),
@@ -403,6 +397,7 @@ t_handle_call_change() ->
?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
end).
+
t_handle_call_change_filter_design_docs() ->
?_test(begin
State0 = mock_state(),
@@ -413,6 +408,7 @@ t_handle_call_change_filter_design_docs() ->
?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
end).
+
t_handle_call_checkpoint_new() ->
?_test(begin
Tid = mock_ets(),
@@ -422,6 +418,7 @@ t_handle_call_checkpoint_new() ->
ets:delete(Tid)
end).
+
t_handle_call_checkpoint_existing() ->
?_test(begin
Tid = mock_ets(),
@@ -432,6 +429,7 @@ t_handle_call_checkpoint_existing() ->
ets:delete(Tid)
end).
+
t_handle_info_created() ->
?_test(begin
State = mock_state(),
@@ -440,6 +438,7 @@ t_handle_info_created() ->
?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
end).
+
t_handle_info_deleted() ->
?_test(begin
State = mock_state(),
@@ -448,6 +447,7 @@ t_handle_info_deleted() ->
?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
end).
+
t_handle_info_updated() ->
?_test(begin
Tid = mock_ets(),
@@ -457,12 +457,14 @@ t_handle_info_updated() ->
?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
end).
+
t_handle_info_other_event() ->
?_test(begin
State = mock_state(),
handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State)
end).
+
t_handle_info_created_other_db() ->
?_test(begin
State = mock_state(),
@@ -470,6 +472,7 @@ t_handle_info_created_other_db() ->
?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
end).
+
t_handle_info_scanner_exit_normal() ->
?_test(begin
Res = handle_info({'EXIT', spid, normal}, mock_state()),
@@ -478,18 +481,21 @@ t_handle_info_scanner_exit_normal() ->
?assertEqual(nil, RState#state.scanner)
end).
+
t_handle_info_scanner_crashed() ->
?_test(begin
Res = handle_info({'EXIT', spid, oops}, mock_state()),
?assertMatch({stop, {scanner_died, oops}, _State}, Res)
end).
+
t_handle_info_event_server_exited() ->
?_test(begin
Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
end).
+
t_handle_info_unknown_pid_exited() ->
?_test(begin
State0 = mock_state(),
@@ -500,6 +506,7 @@ t_handle_info_unknown_pid_exited() ->
?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
end).
+
t_handle_info_change_feed_exited() ->
?_test(begin
Tid0 = mock_ets(),
@@ -518,6 +525,7 @@ t_handle_info_change_feed_exited() ->
ets:delete(Tid1)
end).
+
t_handle_info_change_feed_exited_and_need_rescan() ->
?_test(begin
Tid = mock_ets(),
@@ -536,6 +544,7 @@ t_handle_info_change_feed_exited_and_need_rescan() ->
ets:delete(Tid)
end).
+
t_spawn_changes_reader() ->
?_test(begin
Pid = start_changes_reader(?DBNAME, 3),
@@ -554,6 +563,7 @@ t_spawn_changes_reader() ->
}, {json_req, null}, db]))
end).
+
t_changes_reader_cb_change() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
@@ -565,6 +575,7 @@ t_changes_reader_cb_change() ->
exit(Pid, kill)
end).
+
t_changes_reader_cb_stop() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
@@ -577,9 +588,11 @@ t_changes_reader_cb_stop() ->
exit(Pid, kill)
end).
+
t_changes_reader_cb_other() ->
?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
+
t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
?_test(begin
Tid = mock_ets(),
@@ -603,6 +616,7 @@ t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
ets:delete(Tid)
end).
+
t_handle_call_resume_scan_chfeed_no_ets_entry() ->
?_test(begin
Tid = mock_ets(),
@@ -615,6 +629,7 @@ t_handle_call_resume_scan_chfeed_no_ets_entry() ->
kill_mock_change_reader_and_get_its_args(Pid)
end).
+
t_handle_call_resume_scan_chfeed_ets_entry() ->
?_test(begin
Tid = mock_ets(),
@@ -628,6 +643,7 @@ t_handle_call_resume_scan_chfeed_ets_entry() ->
kill_mock_change_reader_and_get_its_args(Pid)
end).
+
t_handle_call_resume_scan_no_chfeed_ets_entry() ->
?_test(begin
Tid = mock_ets(),
@@ -650,6 +666,7 @@ t_handle_call_resume_scan_no_chfeed_ets_entry() ->
ets:delete(Tid)
end).
+
t_start_link() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
@@ -666,6 +683,7 @@ t_start_link() ->
?assert(meck:called(couch_event, register_all, [Pid]))
end).
+
t_start_link_no_ddocs() ->
?_test(begin
{ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
@@ -681,6 +699,7 @@ t_start_link_no_ddocs() ->
exit(Pid, kill)
end).
+
t_misc_gen_server_callbacks() ->
?_test(begin
?assertEqual(ok, terminate(reason, state)),
@@ -786,15 +805,18 @@ kill_mock_change_reader_and_get_its_args(Pid) ->
erlang:error(spawn_change_reader_timeout)
end.
+
mock_changes_reader() ->
meck:expect(couch_changes, handle_db_changes,
fun(_ChArgs, _Req, db) ->
fun mock_changes_reader_loop/1
end).
+
mock_ets() ->
ets:new(multidb_test_ets, [set, public]).
+
mock_state() ->
#state{
mod = ?MOD,
@@ -804,10 +826,12 @@ mock_state() ->
scanner = spid,
pids = []}.
+
mock_state(Ets) ->
State = mock_state(),
State#state{tid = Ets}.
+
mock_state(Ets, Pid) ->
State = mock_state(Ets),
State#state{pids = [{?DBNAME, Pid}]}.
@@ -821,10 +845,13 @@ change_row(Id) when is_binary(Id) ->
{doc, {[{<<"_id">>, Id}, {<<"_rev">>, <<"1-f00">>}]}}
]}.
+
handle_call_ok(Msg, State) ->
?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
+
handle_info_check(Msg, State) ->
?assertMatch({noreply, _}, handle_info(Msg, State)).
+
-endif.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index df5a347..0b66331 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -182,7 +182,6 @@ stream_active_docs_info(Cb, UserAcc, States) ->
any().
stream_active_docs_info([], _Cb, UserAcc, _States) ->
UserAcc;
-
stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
case rpc:call(Node, couch_replicator_doc_processor, docs, [States]) of
{badrpc, Reason} ->
@@ -201,10 +200,10 @@ stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
({meta, any()}, query_acc()) -> {ok, query_acc()};
(complete, query_acc()) -> {ok, query_acc()}.
handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
- DocId = couch_util:get_value(id, Props),
- DocStateBin = couch_util:get_value(key, Props),
+ DocId = get_value(id, Props),
+ DocStateBin = get_value(key, Props),
DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
- MapValue = couch_util:get_value(value, Props),
+ MapValue = get_value(value, Props),
{Source, Target, StartTime, StateTime, StateInfo} = case MapValue of
[Src, Tgt, StartT, StateT, Info] ->
{Src, Tgt, StartT, StateT, Info};
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 585916a..7231ece 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -27,10 +27,8 @@
-module(couch_replicator_clustering).
-behaviour(gen_server).
-
-behaviour(config_listener).
-
-export([
start_link/0
]).
@@ -225,7 +223,6 @@ handle_config_change(_, _, _, _, S) ->
handle_config_terminate(_, stop, _) -> ok;
-
handle_config_terminate(_S, _R, _St) ->
Pid = whereis(?MODULE),
erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
index c1f1048..627f95b 100644
--- a/src/couch_replicator/src/couch_replicator_connection.erl
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -13,10 +13,8 @@
-module(couch_replicator_connection).
-behavior(gen_server).
-
-behavior(config_listener).
-
-export([
start_link/0
]).
@@ -40,10 +38,8 @@
handle_config_terminate/3
]).
-
-include_lib("ibrowse/include/ibrowse.hrl").
-
-define(DEFAULT_CLOSE_INTERVAL, 90000).
-define(RELISTEN_DELAY, 5000).
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
index f58eac8..92b0222 100644
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ b/src/couch_replicator/src/couch_replicator_db_changes.erl
@@ -101,7 +101,6 @@ restart_mdb_changes(#state{mdb_changes = _Pid} = State) ->
-spec stop_mdb_changes(#state{}) -> #state{}.
stop_mdb_changes(#state{mdb_changes = nil} = State) ->
State;
-
stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
couch_log:notice("Stopping replicator db changes listener ~p", [Pid]),
unlink(Pid),
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 4ef7c95..8b19836 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -13,7 +13,6 @@
-module(couch_replicator_doc_processor).
-behaviour(gen_server).
-
-behaviour(couch_multidb_changes).
-export([
@@ -44,7 +43,6 @@
notify_cluster_event/2
]).
-
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
@@ -396,6 +394,7 @@ worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
end,
ok.
+
-spec maybe_update_doc_error(#rep{}, any()) -> ok.
maybe_update_doc_error(Rep, Reason) ->
case update_docs() of
@@ -405,6 +404,7 @@ maybe_update_doc_error(Rep, Reason) ->
ok
end.
+
-spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
maybe_update_doc_triggered(Rep, RepId) ->
case update_docs() of
@@ -467,13 +467,12 @@ maybe_start_worker(Id) ->
-spec get_worker_wait(#rdoc{}) -> seconds().
get_worker_wait(#rdoc{state = scheduled, filter = user}) ->
filter_backoff();
-
get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
error_backoff(ErrCnt);
-
get_worker_wait(#rdoc{state = initializing}) ->
0.
+
-spec update_docs() -> boolean().
update_docs() ->
config:get_boolean("replicator", "update_docs", ?DEFAULT_UPDATE_DOCS).
@@ -575,11 +574,9 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) ->
]}.
-
-spec ejson_doc_state_filter(atom(), [atom()]) -> boolean().
ejson_doc_state_filter(_DocState, []) ->
true;
-
ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
lists:member(State, States).
@@ -946,5 +943,4 @@ bad_change() ->
]}}
]}.
-
-endif.
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index f0d3157..6859d5b 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -451,7 +451,6 @@ parse_rep_db(undefined, _Proxy, _Options) ->
-spec maybe_add_trailing_slash(binary() | list()) -> list().
maybe_add_trailing_slash(Url) when is_binary(Url) ->
maybe_add_trailing_slash(?b2l(Url));
-
maybe_add_trailing_slash(Url) ->
case lists:last(Url) of
$/ ->
@@ -561,6 +560,7 @@ check_options(Options) ->
"`doc_ids`,`filter`,`selector` are mutually exclusive"})
end.
+
-spec parse_proxy_params(binary() | [_]) -> [_].
parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
parse_proxy_params(?b2l(ProxyUrl));
@@ -716,6 +716,7 @@ check_options_pass_values_test() ->
?assertEqual(check_options([{filter, x}]), [{filter, x}]),
?assertEqual(check_options([{selector, x}]), [{selector, x}]).
+
check_options_fail_values_test() ->
?assertThrow({bad_request, _},
check_options([{doc_ids, x}, {filter, y}])),
@@ -726,6 +727,7 @@ check_options_fail_values_test() ->
?assertThrow({bad_request, _},
check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
+
check_convert_options_pass_test() ->
?assertEqual([], convert_options([])),
?assertEqual([], convert_options([{<<"random">>, 42}])),
@@ -740,6 +742,7 @@ check_convert_options_pass_test() ->
?assertEqual([{selector, {key, value}}],
convert_options([{<<"selector">>, {key, value}}])).
+
check_convert_options_fail_test() ->
?assertThrow({bad_request, _},
convert_options([{<<"cancel">>, <<"true">>}])),
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index 96a6ea1..5668820 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -152,7 +152,6 @@ fetch_internal(DDocName, FilterName, Source, UserCtx) ->
end.
-
-spec query_params([_]) -> {[_]}.
query_params(Options)->
couch_util:get_value(query_params, Options, {[]}).
@@ -202,6 +201,7 @@ ejsort_basic_values_test() ->
?assertEqual(ejsort([]), []),
?assertEqual(ejsort({[]}), {[]}).
+
ejsort_compound_values_test() ->
?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]),
Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]},
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 175b3e5..7f26db7 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -32,6 +32,7 @@ replication_id(#rep{options = Options} = Rep) ->
BaseId = replication_id(Rep, ?REP_ID_VERSION),
{BaseId, maybe_append_options([continuous, create_target], Options)}.
+
% Versioned clauses for generating replication IDs.
% If a change is made to how replications are identified,
% please add a new clause and increase ?REP_ID_VERSION.
@@ -69,10 +70,8 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
convert(Id) when is_list(Id) ->
convert(?l2b(Id));
-
convert(Id) when is_binary(Id) ->
lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-
convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
Id.
diff --git a/src/couch_replicator/src/couch_replicator_manager.erl b/src/couch_replicator/src/couch_replicator_manager.erl
index 6a79ce6..afccc0b 100644
--- a/src/couch_replicator/src/couch_replicator_manager.erl
+++ b/src/couch_replicator/src/couch_replicator_manager.erl
@@ -12,8 +12,9 @@
-module(couch_replicator_manager).
-% TODO: This is a temporary proxy module to external calls (outside replicator) to other
-% replicator modules. This is done to avoid juggling multiple repos during development.
+% TODO: This is a temporary proxy module to external calls (outside replicator)
+% to other replicator modules. This is done to avoid juggling multiple repos
+% during development.
% NV: TODO: These functions were moved to couch_replicator_docs
% but it is still called from fabric_doc_update. Keep it here for now
diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter.erl b/src/couch_replicator/src/couch_replicator_rate_limiter.erl
index 7d5f4b7..e84ae99 100644
--- a/src/couch_replicator/src/couch_replicator_rate_limiter.erl
+++ b/src/couch_replicator/src/couch_replicator_rate_limiter.erl
@@ -41,7 +41,6 @@
-behaviour(gen_server).
-
-export([
start_link/0
]).
@@ -62,9 +61,7 @@
success/1
]).
-
% Types
-
-type key() :: any().
-type interval() :: non_neg_integer().
-type msec() :: non_neg_integer().
@@ -72,7 +69,6 @@
% Definitions
-
% Main parameters of the algorithm. The factor is the multiplicative part and
% base interval is the additive.
-define(BASE_INTERVAL, 20).
@@ -241,13 +237,10 @@ time_decay(_Dt, Interval) ->
-spec additive_factor(interval()) -> interval().
additive_factor(Interval) when Interval > 10000 ->
?BASE_INTERVAL * 50;
-
- additive_factor(Interval) when Interval > 1000 ->
+additive_factor(Interval) when Interval > 1000 ->
?BASE_INTERVAL * 5;
-
additive_factor(Interval) when Interval > 100 ->
?BASE_INTERVAL * 2;
-
additive_factor(_Interval) ->
?BASE_INTERVAL.
diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
index bad3264..dd5cdaa 100644
--- a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
+++ b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
@@ -31,7 +31,6 @@
term_to_table/1
]).
-
-define(SHARDS_N, 16).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index 233f31b..18118e1 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -13,10 +13,8 @@
-module(couch_replicator_scheduler).
-behaviour(gen_server).
-
-behaviour(config_listener).
-
-export([
start_link/0
]).
@@ -50,13 +48,11 @@
handle_config_terminate/3
]).
-
-include("couch_replicator_scheduler.hrl").
-include("couch_replicator.hrl").
-include("couch_replicator_api_wrap.hrl").
-include_lib("couch/include/couch_db.hrl").
-
%% types
-type event_type() :: added | started | stopped | {crashed, any()}.
-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
@@ -163,7 +159,6 @@ job_summary(JobId, HealthThreshold) ->
job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
list_to_binary(couch_util:url_strip_password(ProxyUrl));
-
job_proxy_url(_Endpoint) ->
null.
@@ -188,8 +183,6 @@ find_jobs_by_doc(DbName, DocId) ->
[RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-
%% gen_server functions
init(_) ->
@@ -331,8 +324,7 @@ handle_config_terminate(_, _, _) ->
erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-%% private functions
-
+%% Private functions
% Handle crashed jobs. Handling differs between transient and permanent jobs.
% Transient jobs are those posted to the _replicate endpoint. They don't have a
@@ -389,6 +381,7 @@ maybe_start_newly_added_job(Job, State) ->
ok
end.
+
% Return up to a given number of oldest, not recently crashed jobs. Try to be
% memory efficient and use ets:foldl to accumulate jobs.
-spec pending_jobs(non_neg_integer()) -> [#job{}].
@@ -621,6 +614,7 @@ job_by_pid(Pid) when is_pid(Pid) ->
{ok, Job}
end.
+
-spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
job_by_id(Id) ->
case ets:lookup(?MODULE, Id) of
@@ -705,6 +699,7 @@ start_pending_jobs(State, Running, Pending) ->
ok
end.
+
-spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
rotate_jobs(State, Running, Pending) ->
#state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
@@ -764,24 +759,20 @@ stats_fold(#job{pid = undefined, history = [{added, T}]}, Acc) ->
#stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
Dt = round(timer:now_diff(Now, T) / 1000000),
Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-
stats_fold(#job{pid = undefined, history = [{stopped, T} | _]}, Acc) ->
#stats_acc{now = Now, pending_t = SumT, pending_n = Cnt} = Acc,
Dt = round(timer:now_diff(Now, T) / 1000000),
Acc#stats_acc{pending_t = SumT + Dt, pending_n = Cnt + 1};
-
stats_fold(#job{pid = undefined, history = [{{crashed, _}, T} | _]}, Acc) ->
#stats_acc{now = Now, crashed_t = SumT, crashed_n = Cnt} = Acc,
Dt = round(timer:now_diff(Now, T) / 1000000),
Acc#stats_acc{crashed_t = SumT + Dt, crashed_n = Cnt + 1};
-
stats_fold(#job{pid = P, history = [{started, T} | _]}, Acc) when is_pid(P) ->
#stats_acc{now = Now, running_t = SumT, running_n = Cnt} = Acc,
Dt = round(timer:now_diff(Now, T) / 1000000),
Acc#stats_acc{running_t = SumT + Dt, running_n = Cnt + 1}.
-
-spec avg(Sum :: non_neg_integer(), N :: non_neg_integer()) ->
non_neg_integer().
avg(_Sum, 0) ->
@@ -881,7 +872,6 @@ maybe_optimize_job_for_rate_limiting(Job = #job{history =
],
Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts),
Job#job{rep = Rep};
-
maybe_optimize_job_for_rate_limiting(Job) ->
Job.
@@ -900,6 +890,7 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
-ifdef(TEST).
+
-include_lib("eunit/include/eunit.hrl").
@@ -1414,8 +1405,8 @@ stopped() ->
stopped(WhenSec) ->
{stopped, {0, WhenSec, 0}}.
+
added() ->
{added, {0, 0, 0}}.
-endif.
-
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 4d80af0..4534f24 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -11,24 +11,28 @@
% the License.
-module(couch_replicator_scheduler_job).
+
-behaviour(gen_server).
--vsn(1).
+
+-export([
+ start_link/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3,
+ format_status/2
+]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator_api_wrap.hrl").
-include("couch_replicator_scheduler.hrl").
-include("couch_replicator.hrl").
-%% public api
--export([start_link/1]).
-
-%% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
--export([format_status/2]).
-
-
-%% imports
-import(couch_util, [
get_value/2,
get_value/3,
@@ -42,9 +46,10 @@
]).
-%% definitions
-define(LOWEST_SEQ, 0).
-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+
+
-record(rep_state, {
rep_details,
source_name,
@@ -81,6 +86,7 @@
view = nil
}).
+
start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
RepChildId = BaseId ++ Ext,
Source = couch_replicator_api_wrap:db_uri(Src),
@@ -98,9 +104,11 @@ start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
{error, Reason}
end.
+
init(InitArgs) ->
{ok, InitArgs, 0}.
+
do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
process_flag(trap_exit, true),
@@ -203,13 +211,75 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
}
}.
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
- Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
- couch_log:notice(Msg, [RepId]),
- Src#httpdb{http_connections = 2};
-adjust_maxconn(Src, _RepId) ->
- Src.
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+ {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+ gen_server:reply(From, ok),
+ NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+ {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+ #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+ current_through_seq = ThroughSeq, stats = Stats} = State) ->
+ gen_server:reply(From, ok),
+ {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+ [] ->
+ {Seq, []};
+ [Seq | Rest] ->
+ {Seq, Rest};
+ [_ | _] ->
+ {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+ end,
+ NewHighestDone = lists:max([HighestDone, Seq]),
+ NewThroughSeq = case NewSeqsInProgress of
+ [] ->
+ lists:max([NewThroughSeq0, NewHighestDone]);
+ _ ->
+ NewThroughSeq0
+ end,
+ couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+ "new through seq is ~p, highest seq done was ~p, "
+ "new highest seq done is ~p~n"
+ "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+ [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+ NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+ NewState = State#rep_state{
+ stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+ current_through_seq = NewThroughSeq,
+ seqs_in_progress = NewSeqsInProgress,
+ highest_seq_done = NewHighestDone
+ },
+ update_task(NewState),
+ {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+ #rep_state{source = #db{name = DbName} = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+ #rep_state{target = #db{name = DbName} = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+ case do_checkpoint(State) of
+ {ok, NewState} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+ {noreply, NewState#rep_state{timer = start_timer(State)}};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+ {stop, Error, State}
+ end;
+
+handle_cast({report_seq, Seq},
+ #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+ {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
handle_info(shutdown, St) ->
{stop, shutdown, St};
@@ -295,116 +365,6 @@ handle_info(timeout, InitArgs) ->
{stop, {shutdown, ShutdownReason}, ShutdownState}
end.
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
- {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
- gen_server:reply(From, ok),
- NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
- {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
- #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
- current_through_seq = ThroughSeq, stats = Stats} = State) ->
- gen_server:reply(From, ok),
- {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
- [] ->
- {Seq, []};
- [Seq | Rest] ->
- {Seq, Rest};
- [_ | _] ->
- {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
- end,
- NewHighestDone = lists:max([HighestDone, Seq]),
- NewThroughSeq = case NewSeqsInProgress of
- [] ->
- lists:max([NewThroughSeq0, NewHighestDone]);
- _ ->
- NewThroughSeq0
- end,
- couch_log:debug("Worker reported seq ~p, through seq was ~p, "
- "new through seq is ~p, highest seq done was ~p, "
- "new highest seq done is ~p~n"
- "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
- [Seq, ThroughSeq, NewThroughSeq, HighestDone,
- NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
- NewState = State#rep_state{
- stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
- current_through_seq = NewThroughSeq,
- seqs_in_progress = NewSeqsInProgress,
- highest_seq_done = NewHighestDone
- },
- update_task(NewState),
- {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
- #rep_state{source = #db{name = DbName} = Source} = State) ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
- #rep_state{target = #db{name = DbName} = Target} = State) ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
- case do_checkpoint(State) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints, success]),
- {noreply, NewState#rep_state{timer = start_timer(State)}};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
- {stop, Error, State}
- end;
-
-handle_cast({report_seq, Seq},
- #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
- NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
- {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
- {ok, State}.
-
-
-headers_strip_creds([], Acc) ->
- lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
- Value = case string:to_lower(Key) of
- "authorization" ->
- "****";
- _ ->
- Value0
- end,
- headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
- HttpDb#httpdb{
- url = couch_util:url_strip_password(Url),
- headers = headers_strip_creds(Headers, [])
- };
-httpdb_strip_creds(LocalDb) ->
- LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
- Rep#rep{
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
- % #rep_state contains the source and target at the top level and also
- % in the nested #rep_details record
- State#rep_state{
- rep_details = rep_strip_creds(Rep),
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-
terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
checkpoint_history = CheckpointHistory} = State) ->
@@ -471,10 +431,60 @@ terminate_cleanup(State) ->
couch_replicator_api_wrap:db_close(State#rep_state.target).
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+ {ok, State}.
+
+
format_status(_Opt, [_PDict, State]) ->
[{data, [{"State", state_strip_creds(State)}]}].
+headers_strip_creds([], Acc) ->
+ lists:reverse(Acc);
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+ Value = case string:to_lower(Key) of
+ "authorization" ->
+ "****";
+ _ ->
+ Value0
+ end,
+ headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+ HttpDb#httpdb{
+ url = couch_util:url_strip_password(Url),
+ headers = headers_strip_creds(Headers, [])
+ };
+httpdb_strip_creds(LocalDb) ->
+ LocalDb.
+
+
+rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
+ Rep#rep{
+ source = httpdb_strip_creds(Source),
+ target = httpdb_strip_creds(Target)
+ }.
+
+
+state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
+ % #rep_state contains the source and target at the top level and also
+ % in the nested #rep_details record
+ State#rep_state{
+ rep_details = rep_strip_creds(Rep),
+ source = httpdb_strip_creds(Source),
+ target = httpdb_strip_creds(Target)
+ }.
+
+
+adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+ Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+ couch_log:notice(Msg, [RepId]),
+ Src#httpdb{http_connections = 2};
+adjust_maxconn(Src, _RepId) ->
+ Src.
+
+
-spec doc_update_triggered(#rep{}) -> ok.
doc_update_triggered(#rep{db_name = null}) ->
ok;
@@ -489,6 +499,7 @@ doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
[DocId, pp_rep_id(RepId)]),
ok.
+
-spec doc_update_completed(#rep{}, list()) -> ok.
doc_update_completed(#rep{db_name = null}, _Stats) ->
ok;
@@ -500,6 +511,7 @@ doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
[pp_rep_id(RepId), DocId]),
ok.
+
do_last_checkpoint(#rep_state{seqs_in_progress = [],
highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
{stop, normal, cancel_timer(State)};
@@ -624,6 +636,7 @@ spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
end).
+
changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
receive
{get_changes, From} ->
@@ -752,6 +765,7 @@ update_checkpoint(Db, Doc, DbType) ->
" checkpoint document: ", (to_binary(Reason))/binary>>})
end.
+
update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
try
case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
@@ -831,6 +845,7 @@ compare_replication_logs(SrcDoc, TgtDoc) ->
compare_rep_history(SourceHistory, TargetHistory)
end.
+
compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
couch_log:notice("no common ancestry -- performing full replication", []),
{?LOWEST_SEQ, []};
@@ -872,6 +887,7 @@ db_monitor(#db{} = Db) ->
db_monitor(_HttpDb) ->
nil.
+
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
@@ -938,9 +954,7 @@ rep_stats(State) ->
replication_start_error({unauthorized, DbUri}) ->
{unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
-
replication_start_error({db_not_found, DbUri}) ->
{db_not_found, <<"could not open ", DbUri/binary>>};
-
replication_start_error(Error) ->
Error.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
index 17ae55c..8ab55f8 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -14,12 +14,6 @@
-behaviour(supervisor).
--vsn(1).
-
-
-%% includes
--include("couch_replicator.hrl").
-
%% public api
-export([
start_link/0,
@@ -33,6 +27,10 @@
]).
+%% includes
+-include("couch_replicator.hrl").
+
+
%% public functions
start_link() ->
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].