You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by to...@apache.org on 2016/05/05 06:08:27 UTC
couch-replicator commit: updated refs/heads/3010-add-rate-limiter to
b7b2ac3
Repository: couchdb-couch-replicator
Updated Branches:
refs/heads/3010-add-rate-limiter [created] b7b2ac366
Add rate limiter for replication
For a remote replication request, we add in support for rate limiting.
We add in 4 new replication options:
src_rate_limit
src_rate_period
target_rate_limit
target_rate_period
These define the maximum number of requests per interval,
i.e limit = 10, period = 5 would be a rate of 2 requests per second.
COUCHDB-3010
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/b7b2ac36
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/b7b2ac36
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/b7b2ac36
Branch: refs/heads/3010-add-rate-limiter
Commit: b7b2ac3663fc653ece65ab601d48333d07aac878
Parents: ab0afce
Author: Tony Sun <to...@cloudant.com>
Authored: Wed May 4 21:25:30 2016 -0700
Committer: Tony Sun <to...@cloudant.com>
Committed: Wed May 4 23:09:50 2016 -0700
----------------------------------------------------------------------
src/couch_replicator.erl | 12 ++
src/couch_replicator_api_wrap.erl | 6 +-
src/couch_replicator_httpc.erl | 3 +-
src/couch_replicator_httpc_pool.erl | 1 +
src/couch_replicator_rate_limiter.erl | 219 +++++++++++++++++++++++++++++
src/couch_replicator_sup.erl | 6 +
src/couch_replicator_utils.erl | 18 ++-
7 files changed, 261 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 4e25e14..ea0655b 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -256,6 +256,17 @@ init(InitArgs) ->
do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
process_flag(trap_exit, true),
+ SrcLimit = get_value(src_rate_limit, Options),
+ SrcPeriod = get_value(src_rate_period, Options),
+ TargetLimit = get_value(target_rate_limit, Options),
+ TargetPeriod = get_value(target_rate_period, Options),
+ % We possibly start our rate limiter before init_state because
+ % db_open also uses a send_req
+ couch_replicator_rate_limiter:maybe_start_rate_limiter(Rep#rep.source,
+ SrcLimit, SrcPeriod),
+ couch_replicator_rate_limiter:maybe_start_rate_limiter(Rep#rep.target,
+ TargetLimit, TargetPeriod),
+
#rep_state{
source = Source,
target = Target,
@@ -269,6 +280,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
NumWorkers = get_value(worker_processes, Options),
BatchSize = get_value(worker_batch_size, Options),
+
{ok, ChangesQueue} = couch_work_queue:new([
{max_items, BatchSize * NumWorkers * 2},
{max_size, 100 * 1024 * NumWorkers}
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index ff6b00c..7977300 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -136,9 +136,11 @@ db_open(DbName, Options, Create) ->
throw({unauthorized, DbName})
end.
-db_close(#httpdb{httpc_pool = Pool}) ->
+db_close(#httpdb{httpc_pool = Pool, url = Url}) ->
unlink(Pool),
- ok = couch_replicator_httpc_pool:stop(Pool);
+ ok = couch_replicator_httpc_pool:stop(Pool),
+ couch_replicator_rate_limiter:maybe_decrement_rep_count(Url),
+ ok;
db_close(DbName) ->
catch couch_db:close(DbName).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 668b218..6807426 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -45,13 +45,14 @@ setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
{ok, Db#httpdb{httpc_pool = Pid}}.
-send_req(HttpDb, Params1, Callback) ->
+send_req(#httpdb{url = Url} = HttpDb, Params1, Callback) ->
put(?STREAM_STATUS, init),
couch_stats:increment_counter([couch_replicator, requests]),
Params2 = ?replace(Params1, qs,
[{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
Params = ?replace(Params2, ibrowse_options,
lists:keysort(1, get_value(ibrowse_options, Params2, []))),
+ couch_replicator_rate_limiter:maybe_delay_request(Url),
{Worker, Response} = send_ibrowse_req(HttpDb, Params),
Ret = try
process_response(Response, Worker, HttpDb, Params, Callback)
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl
index 09e3b23..2102c0c 100644
--- a/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator_httpc_pool.erl
@@ -23,6 +23,7 @@
-export([code_change/3, terminate/2]).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
-import(couch_util, [
get_value/2
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_rate_limiter.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_rate_limiter.erl b/src/couch_replicator_rate_limiter.erl
new file mode 100644
index 0000000..275c14b
--- /dev/null
+++ b/src/couch_replicator_rate_limiter.erl
@@ -0,0 +1,219 @@
+-module(couch_replicator_rate_limiter).
+-behaviour(gen_server).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("couch_replicator_api_wrap.hrl").
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-export ([
+ maybe_start_rate_limiter/3,
+ maybe_decrement_rep_count/1,
+ maybe_delay_request/1
+ ]).
+
+-record(rep_limiter, {
+ requestCounter = 0,
+ lastUpdateTimestamp = 0,
+ pid,
+ replications = 0,
+ limit, % max requests
+ period % interval is in seconds
+}).
+
+
+%% For each unique host, an entry is a rep_limiter record
+-define(HOSTS, couch_replicator_limit_hosts).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+update_host(Host, Limit, Period) ->
+ gen_server:call(?MODULE, {update_host, Host, Limit, Period}, infinity).
+
+
+decrement_replications(Host) ->
+ gen_server:call(?MODULE, {decrement_replications, Host}, infinity).
+
+
+% gen_server functions.
+init([]) ->
+ process_flag(trap_exit, true),
+ ets:new(?HOSTS, [set, public, named_table]),
+ {ok, nil}.
+
+
+handle_call({update_host, Host, Limit, Period}, _From, State) ->
+ Reps = case ets:insert_new(?HOSTS, {Host, #rep_limiter{}}) of
+ true ->
+ % Brand new host for replication, need to spawn
+ % a new resetter process and initialize
+ LoopPid = spawn_link(fun() ->
+ reset_requests_loop(Host, Period)
+ end),
+ modify_host(Host, [{4, LoopPid}, {5, 1}, {6, Limit},
+ {7, Period}]),
+ 1;
+ false ->
+ % this allows the rate limit to be changed during replication
+ modify_host(Host, [{6, Limit}, {7, Period}]),
+ update_rep_count(Host, 1)
+ end,
+ {ok, Reps, State};
+
+
+handle_call({decrement_replications, Host}, _From, State) ->
+ Replications = case update_rep_count(Host, -1) of
+ Rep0 when Rep0 =< 0 ->
+ ResetPid = ets:lookup_element(?HOSTS, Host, 4),
+ % no more replications at this moment, stop resetter
+ % process and remove the host from the ets table
+ exit(ResetPid, stop_resetter),
+ remove_host(Host);
+ _ ->
+ ok % this shoulld be some error
+ end,
+ {ok, Replications, State}.
+
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+
+handle_info({'EXIT', _FromPid, stop_resetter}, State) ->
+ {noreply, State};
+
+handle_info({'EXIT', _FromPid, Reason}, State) ->
+ couch_log:error("couch_replicator_rate_limiter reset process
+ died abnormally due to: ~p", [Reason]),
+ {noreply, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+modify_host(Host, Elements) ->
+ ets:update_element(?HOSTS, Host, Elements).
+
+
+remove_host(Host) ->
+ ets:delete(?HOSTS, Host).
+
+
+update_rep_count(Host, Value) ->
+ ets:update_counter(?HOSTS, Host, {5, Value}).
+
+
+increment_req_count(Host) ->
+ ets:update_counter(?HOSTS, Host, {2, 1}).
+
+
+code_change(_OldVsn, nil, _Extra) ->
+ {ok, nil}.
+
+reset_requests_loop(Host, Period) ->
+ modify_host(Host, [{2, 0}, {3, os:timestamp()}]),
+ timer:sleep(Period),
+ reset_requests_loop(Host, Period).
+
+% Returns sleep time in milliseconds
+calculate_sleep_time(Host, RequestCounter) ->
+ {Limit, Period} = get_host_rate(Host),
+ Interval = (Period / Limit) * 1000,
+ LastUpdateTimestamp = ets:lookup_element(?HOSTS, Host, 3),
+ ReqTimestamp = os:timestamp(),
+ TimeDiff0 = timer:now_diff(ReqTimestamp, LastUpdateTimestamp),
+ TimeDiff1 = TimeDiff0 div 1000 rem 1000,
+ CountDiff = RequestCounter - Limit,
+ NextReset = ts_to_millisec(LastUpdateTimestamp) + (Period * 1000),
+ case {CountDiff, TimeDiff1} of
+ {C0, _} when C0 =< 0 ->
+ 0;
+ {C0, T0} when T0 > 0 ->
+ (NextReset - T0) + (C0 * Interval);
+ {C0, T0} when T0 =< 0 ->
+ % This can occur if the request comes in really close to
+ % to the reset time and we read the reset value too late.
+ abs(T0) + (C0 * Interval)
+ end.
+
+
+ts_to_millisec({Mega, Sec, Micro}) ->
+ (Mega * 1000000) + (Sec * 1000) + (Micro div 1000 rem 1000).
+
+
+get_host_rate(Host) ->
+ Limit = try ets:lookup_element(?HOSTS, Host, 6) of
+ L ->
+ L
+ catch error:badarg ->
+ -1
+ end,
+ Period = try ets:lookup_element(?HOSTS, Host, 7) of
+ P ->
+ P
+ catch error:badarg ->
+ -1
+ end,
+ {Limit, Period}.
+
+
+
+maybe_start_rate_limiter(#httpdb{url = Url}, Limit, Period) ->
+ Host = ibrowse_lib:parse_url(Url),
+ case {Limit, Period} of
+ {-1, _} ->
+ false;
+ {_, -1} ->
+ false;
+ {L0, P0} ->
+ update_host(Host, L0, P0)
+ end;
+% disabled for local
+maybe_start_rate_limiter(_, _ , _) ->
+ false.
+
+
+maybe_decrement_rep_count(Url) ->
+ Host = ibrowse_lib:parse_url(Url),
+ {Limit, Period} = get_host_rate(Host),
+ case {Limit, Period} of
+ {-1, _} ->
+ false;
+ {_, -1} ->
+ false;
+ {_, _} ->
+ decrement_replications(Host)
+ end.
+
+
+maybe_delay_request(Url) ->
+ Host = ibrowse_lib:parse_url(Url),
+ {Limit, Period} = get_host_rate(Host),
+ case {Limit, Period} of
+ {-1, _} ->
+ false;
+ {_, -1} ->
+ false;
+ {_, _} ->
+ Counter = increment_req_count(Host),
+ Sleep = calculate_sleep_time(Host, Counter),
+ timer:sleep(Sleep)
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index 57ad63b..340db18 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -32,6 +32,12 @@ init(_Args) ->
brutal_kill,
worker,
[couch_replicator_manager]},
+ {couch_replicator_rate_limiter,
+ {couch_replicator_rate_limiter, start_link, []},
+ permanent,
+ brutal_kill, % Need to think about if this should be brutal_kill
+ worker,
+ [couch_replicator_rate_limiter]},
{couch_replicator_job_sup,
{couch_replicator_job_sup, start_link, []},
permanent,
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 76bc8e1..48bfa71 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -261,6 +261,10 @@ make_options(Props) ->
DefRetries = config:get("replicator", "retries_per_request", "10"),
UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"),
+ DefSrcLimit = config:get("replicator", "src_rate_limit", "-1"),
+ DefSrcPeriod = config:get("replicator", "src_rate_period", "-1"),
+ DefTrgLimit = config:get("replicator", "target_rate_limit", "-1"),
+ DefTrgPeriod = config:get("replicator", "target_rate_period", "-1"),
{ok, DefSocketOptions} = couch_util:parse_term(
config:get("replicator", "socket_options",
"[{keepalive, true}, {nodelay, false}]")),
@@ -272,7 +276,11 @@ make_options(Props) ->
{worker_batch_size, list_to_integer(DefBatchSize)},
{worker_processes, list_to_integer(DefWorkers)},
{use_checkpoints, list_to_existing_atom(UseCheckpoints)},
- {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
+ {checkpoint_interval, list_to_integer(DefCheckpointInterval)},
+ {src_rate_limit, list_to_integer(DefSrcLimit)},
+ {src_rate_period, list_to_integer(DefSrcPeriod)},
+ {target_rate_limit, list_to_integer(DefTrgLimit)},
+ {target_rate_period, list_to_integer(DefTrgPeriod)}
])).
@@ -324,6 +332,14 @@ convert_options([{<<"use_checkpoints">>, V} | R]) ->
[{use_checkpoints, V} | convert_options(R)];
convert_options([{<<"checkpoint_interval">>, V} | R]) ->
[{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"src_rate_limit">>, V} | R]) ->
+ [{src_rate_limit, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"src_rate_period">>, V} | R]) ->
+ [{src_rate_period, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"target_rate_limit">>, V} | R]) ->
+ [{target_rate_limit, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"target_rate_period">>, V} | R]) ->
+ [{target_rate_period, couch_util:to_integer(V)} | convert_options(R)];
convert_options([_ | R]) -> % skip unknown option
convert_options(R).