You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by to...@apache.org on 2016/05/05 06:08:27 UTC

couch-replicator commit: updated refs/heads/3010-add-rate-limiter to b7b2ac3

Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/3010-add-rate-limiter [created] b7b2ac366


Add rate limiter for replication

For a remote replication request, we add in support for rate limiting.
We add in 4 new replication options:

src_rate_limit
src_rate_period
target_rate_limit
target_rate_period

These define the maximum number of requests per interval,
i.e limit = 10, period = 5 would be a rate of 2 requests per second.

COUCHDB-3010


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/b7b2ac36
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/b7b2ac36
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/b7b2ac36

Branch: refs/heads/3010-add-rate-limiter
Commit: b7b2ac3663fc653ece65ab601d48333d07aac878
Parents: ab0afce
Author: Tony Sun <to...@cloudant.com>
Authored: Wed May 4 21:25:30 2016 -0700
Committer: Tony Sun <to...@cloudant.com>
Committed: Wed May 4 23:09:50 2016 -0700

----------------------------------------------------------------------
 src/couch_replicator.erl              |  12 ++
 src/couch_replicator_api_wrap.erl     |   6 +-
 src/couch_replicator_httpc.erl        |   3 +-
 src/couch_replicator_httpc_pool.erl   |   1 +
 src/couch_replicator_rate_limiter.erl | 219 +++++++++++++++++++++++++++++
 src/couch_replicator_sup.erl          |   6 +
 src/couch_replicator_utils.erl        |  18 ++-
 7 files changed, 261 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 4e25e14..ea0655b 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -256,6 +256,17 @@ init(InitArgs) ->
 do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
     process_flag(trap_exit, true),
 
+    SrcLimit = get_value(src_rate_limit, Options),
+    SrcPeriod = get_value(src_rate_period, Options),
+    TargetLimit = get_value(target_rate_limit, Options),
+    TargetPeriod = get_value(target_rate_period, Options),
+    % We possibly start our rate limiter before init_state because
+    % db_open also uses a send_req
+    couch_replicator_rate_limiter:maybe_start_rate_limiter(Rep#rep.source,
+        SrcLimit, SrcPeriod),
+    couch_replicator_rate_limiter:maybe_start_rate_limiter(Rep#rep.target,
+        TargetLimit, TargetPeriod),
+
     #rep_state{
         source = Source,
         target = Target,
@@ -269,6 +280,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
 
     NumWorkers = get_value(worker_processes, Options),
     BatchSize = get_value(worker_batch_size, Options),
+
     {ok, ChangesQueue} = couch_work_queue:new([
         {max_items, BatchSize * NumWorkers * 2},
         {max_size, 100 * 1024 * NumWorkers}

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index ff6b00c..7977300 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -136,9 +136,11 @@ db_open(DbName, Options, Create) ->
         throw({unauthorized, DbName})
     end.
 
-db_close(#httpdb{httpc_pool = Pool}) ->
+db_close(#httpdb{httpc_pool = Pool, url = Url}) ->
     unlink(Pool),
-    ok = couch_replicator_httpc_pool:stop(Pool);
+    ok = couch_replicator_httpc_pool:stop(Pool),
+    couch_replicator_rate_limiter:maybe_decrement_rep_count(Url),
+    ok;
 db_close(DbName) ->
     catch couch_db:close(DbName).
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index 668b218..6807426 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -45,13 +45,14 @@ setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
     {ok, Db#httpdb{httpc_pool = Pid}}.
 
 
-send_req(HttpDb, Params1, Callback) ->
+send_req(#httpdb{url = Url} = HttpDb, Params1, Callback) ->
     put(?STREAM_STATUS, init),
     couch_stats:increment_counter([couch_replicator, requests]),
     Params2 = ?replace(Params1, qs,
         [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
     Params = ?replace(Params2, ibrowse_options,
         lists:keysort(1, get_value(ibrowse_options, Params2, []))),
+    couch_replicator_rate_limiter:maybe_delay_request(Url),
     {Worker, Response} = send_ibrowse_req(HttpDb, Params),
     Ret = try
         process_response(Response, Worker, HttpDb, Params, Callback)

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc_pool.erl b/src/couch_replicator_httpc_pool.erl
index 09e3b23..2102c0c 100644
--- a/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator_httpc_pool.erl
@@ -23,6 +23,7 @@
 -export([code_change/3, terminate/2]).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
 
 -import(couch_util, [
     get_value/2

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_rate_limiter.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_rate_limiter.erl b/src/couch_replicator_rate_limiter.erl
new file mode 100644
index 0000000..275c14b
--- /dev/null
+++ b/src/couch_replicator_rate_limiter.erl
@@ -0,0 +1,219 @@
+-module(couch_replicator_rate_limiter).
+-behaviour(gen_server).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+-include_lib("couch_replicator_api_wrap.hrl").
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-export ([
+    maybe_start_rate_limiter/3,
+    maybe_decrement_rep_count/1,
+    maybe_delay_request/1
+    ]).
+
+-record(rep_limiter, {
+    requestCounter = 0,
+    lastUpdateTimestamp = 0,
+    pid,
+    replications = 0,
+    limit, % max requests
+    period % interval is in seconds
+}).
+
+
+%% For each unique host, an entry is a rep_limiter record
+-define(HOSTS, couch_replicator_limit_hosts).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+update_host(Host, Limit, Period) ->
+    gen_server:call(?MODULE, {update_host, Host, Limit, Period}, infinity).
+
+
+decrement_replications(Host) ->
+    gen_server:call(?MODULE, {decrement_replications, Host}, infinity).
+
+
+% gen_server functions.
+init([]) ->
+    process_flag(trap_exit, true),
+    ets:new(?HOSTS, [set, public, named_table]),
+    {ok, nil}.
+
+
+handle_call({update_host, Host, Limit, Period}, _From, State) ->
+    Reps = case ets:insert_new(?HOSTS, {Host, #rep_limiter{}}) of
+        true ->
+            % Brand new host for replication, need to spawn
+            % a new resetter process and initialize
+            LoopPid = spawn_link(fun() ->
+                reset_requests_loop(Host, Period)
+            end),
+            modify_host(Host, [{4, LoopPid}, {5, 1}, {6, Limit},
+                {7, Period}]),
+            1;
+        false ->
+            % this allows the rate limit to be changed during replication
+            modify_host(Host, [{6, Limit}, {7, Period}]),
+            update_rep_count(Host, 1)
+    end,
+    {ok, Reps, State};
+
+
+handle_call({decrement_replications, Host}, _From, State) ->
+    Replications = case update_rep_count(Host, -1) of
+        Rep0 when Rep0 =< 0 ->
+            ResetPid = ets:lookup_element(?HOSTS, Host, 4),
+            % no more replications at this moment, stop resetter
+            % process and remove the host from the ets table
+            exit(ResetPid, stop_resetter),
+            remove_host(Host);
+        _ ->
+            ok % this shoulld be some error
+    end,
+    {ok, Replications, State}.
+
+
+handle_cast(_, State) ->
+    {noreply, State}.
+
+
+handle_info({'EXIT', _FromPid, stop_resetter}, State) ->
+    {noreply, State};
+
+handle_info({'EXIT', _FromPid, Reason}, State) ->
+    couch_log:error("couch_replicator_rate_limiter reset process
+            died abnormally due to: ~p", [Reason]),
+    {noreply, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+modify_host(Host, Elements) ->
+    ets:update_element(?HOSTS, Host, Elements).
+
+
+remove_host(Host) ->
+    ets:delete(?HOSTS, Host).
+
+
+update_rep_count(Host, Value) ->
+    ets:update_counter(?HOSTS, Host, {5, Value}).
+
+
+increment_req_count(Host) ->
+    ets:update_counter(?HOSTS, Host, {2, 1}).
+
+
+code_change(_OldVsn, nil, _Extra) ->
+    {ok, nil}.
+
+reset_requests_loop(Host, Period) ->
+    modify_host(Host, [{2, 0}, {3, os:timestamp()}]),
+    timer:sleep(Period),
+    reset_requests_loop(Host, Period).
+
+% Returns sleep time in milliseconds
+calculate_sleep_time(Host, RequestCounter) ->
+    {Limit, Period} = get_host_rate(Host),
+    Interval = (Period / Limit) * 1000,
+    LastUpdateTimestamp = ets:lookup_element(?HOSTS, Host, 3),
+    ReqTimestamp = os:timestamp(),
+    TimeDiff0 = timer:now_diff(ReqTimestamp, LastUpdateTimestamp),
+    TimeDiff1 = TimeDiff0 div 1000 rem 1000,
+    CountDiff = RequestCounter - Limit,
+    NextReset = ts_to_millisec(LastUpdateTimestamp) + (Period * 1000),
+    case {CountDiff, TimeDiff1} of
+        {C0, _} when C0 =< 0 ->
+            0;
+        {C0, T0} when T0 > 0 ->
+            (NextReset - T0) + (C0 * Interval);
+        {C0, T0} when T0 =< 0 ->
+             % This can occur if the request comes in really close to
+             % to the reset time and we read the reset value too late.
+             abs(T0) + (C0 * Interval)
+    end.
+
+
+ts_to_millisec({Mega, Sec, Micro}) ->
+    (Mega * 1000000) + (Sec * 1000) +  (Micro div 1000 rem 1000).
+
+
+get_host_rate(Host) ->
+    Limit =  try ets:lookup_element(?HOSTS, Host, 6) of
+        L ->
+            L
+        catch error:badarg ->
+            -1
+    end,
+    Period = try ets:lookup_element(?HOSTS, Host, 7) of
+        P ->
+            P
+        catch error:badarg ->
+            -1
+    end,
+    {Limit, Period}.
+
+
+
+maybe_start_rate_limiter(#httpdb{url = Url}, Limit, Period) ->
+    Host = ibrowse_lib:parse_url(Url),
+    case {Limit, Period} of
+        {-1, _} ->
+            false;
+        {_, -1} ->
+            false;
+        {L0, P0} ->
+            update_host(Host, L0, P0)
+    end;
+% disabled for local
+maybe_start_rate_limiter(_, _ , _) ->
+    false.
+
+
+maybe_decrement_rep_count(Url) ->
+    Host = ibrowse_lib:parse_url(Url),
+    {Limit, Period} = get_host_rate(Host),
+    case {Limit, Period} of
+        {-1, _} ->
+            false;
+        {_, -1} ->
+            false;
+        {_, _} ->
+            decrement_replications(Host)
+    end.
+
+
+maybe_delay_request(Url) ->
+    Host = ibrowse_lib:parse_url(Url),
+    {Limit, Period} = get_host_rate(Host),
+    case {Limit, Period} of
+        {-1, _} ->
+            false;
+        {_, -1} ->
+            false;
+        {_, _} ->
+            Counter = increment_req_count(Host),
+            Sleep = calculate_sleep_time(Host, Counter),
+            timer:sleep(Sleep)
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_sup.erl b/src/couch_replicator_sup.erl
index 57ad63b..340db18 100644
--- a/src/couch_replicator_sup.erl
+++ b/src/couch_replicator_sup.erl
@@ -32,6 +32,12 @@ init(_Args) ->
             brutal_kill,
             worker,
             [couch_replicator_manager]},
+        {couch_replicator_rate_limiter,
+            {couch_replicator_rate_limiter, start_link, []},
+            permanent,
+            brutal_kill, % Need to think about if this should be brutal_kill
+            worker,
+            [couch_replicator_rate_limiter]},
         {couch_replicator_job_sup,
             {couch_replicator_job_sup, start_link, []},
             permanent,

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/b7b2ac36/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 76bc8e1..48bfa71 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -261,6 +261,10 @@ make_options(Props) ->
     DefRetries = config:get("replicator", "retries_per_request", "10"),
     UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
     DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"),
+    DefSrcLimit = config:get("replicator", "src_rate_limit", "-1"),
+    DefSrcPeriod = config:get("replicator", "src_rate_period", "-1"),
+    DefTrgLimit = config:get("replicator", "target_rate_limit", "-1"),
+    DefTrgPeriod = config:get("replicator", "target_rate_period", "-1"),
     {ok, DefSocketOptions} = couch_util:parse_term(
         config:get("replicator", "socket_options",
             "[{keepalive, true}, {nodelay, false}]")),
@@ -272,7 +276,11 @@ make_options(Props) ->
         {worker_batch_size, list_to_integer(DefBatchSize)},
         {worker_processes, list_to_integer(DefWorkers)},
         {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
-        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
+        {checkpoint_interval, list_to_integer(DefCheckpointInterval)},
+        {src_rate_limit, list_to_integer(DefSrcLimit)},
+        {src_rate_period, list_to_integer(DefSrcPeriod)},
+        {target_rate_limit, list_to_integer(DefTrgLimit)},
+        {target_rate_period, list_to_integer(DefTrgPeriod)}
     ])).
 
 
@@ -324,6 +332,14 @@ convert_options([{<<"use_checkpoints">>, V} | R]) ->
     [{use_checkpoints, V} | convert_options(R)];
 convert_options([{<<"checkpoint_interval">>, V} | R]) ->
     [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"src_rate_limit">>, V} | R]) ->
+    [{src_rate_limit, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"src_rate_period">>, V} | R]) ->
+    [{src_rate_period, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"target_rate_limit">>, V} | R]) ->
+    [{target_rate_limit, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"target_rate_period">>, V} | R]) ->
+    [{target_rate_period, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).