You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/02/28 19:19:08 UTC

couch-replicator commit: updated refs/heads/master to 45d739a

Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master 648e465f5 -> 45d739af3


Restore adding some jitter-ed sleep to shard scanning code.

Otherwise a large cluster will flood replicator manager with potentially
hundreds of thousands of `{resume, Shard}` messages. For each one, it
would try to open a changes feed which can add significant load and has
been seen in production to hit varios system limits.

This brings back the change from before the switch to using mem3 shards
for replicator db scans.

Also adds a few tests.

COUCHDB-3311


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/45d739af
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/45d739af
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/45d739af

Branch: refs/heads/master
Commit: 45d739af3fcf8b4f8e3ccca152cb3c2d781dc2fc
Parents: 648e465
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Tue Feb 28 14:00:22 2017 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Tue Feb 28 14:00:22 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator_manager.erl | 74 ++++++++++++++++++++++++++++-------
 1 file changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/45d739af/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index bdc3b8f..4e5073e 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -934,22 +934,30 @@ scan_all_dbs(Server) when is_pid(Server) ->
     {ok, Db} = mem3_util:ensure_exists(
         config:get("mem3", "shards_db", "_dbs")),
     ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = couch_util:get_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                [gen_server:cast(Server, {resume_scan, ShardName})
-                    || ShardName <- replicator_shards(DbName)],
-                ok
-            end
-        end;
-        (_, _) -> ok
-    end),
+    ChangesFun({fun scan_changes_cb/3, {Server, 1}}),
     couch_db:close(Db).
 
+scan_changes_cb({change, {Change}, _}, _, {Server, AccCount}) ->
+    DbName = couch_util:get_value(<<"id">>, Change),
+    case DbName of <<"_design/", _/binary>> -> {Server, AccCount}; _Else ->
+        case couch_replicator_utils:is_deleted(Change) of
+        true ->
+            {Server, AccCount};
+        false ->
+            UpdatedCount = lists:foldl(fun(ShardName, Count) ->
+                spawn_link(fun() ->
+                    timer:sleep(jitter(Count)),
+                    gen_server:cast(Server, {resume_scan, ShardName})
+                end),
+                Count + 1
+           end, AccCount, replicator_shards(DbName)),
+           {Server, UpdatedCount}
+        end
+    end;
+
+scan_changes_cb(_, _, {Server, AccCount}) ->
+    {Server, AccCount}.
+
 
 replicator_shards(DbName) ->
     case is_replicator_db(DbName) of
@@ -1027,4 +1035,42 @@ t_fail_non_replicator_shard() ->
     end).
 
 
+scan_dbs_test_() ->
+{
+      foreach,
+      fun() -> test_util:start_couch([mem3, fabric]) end,
+      fun(Ctx) -> test_util:stop_couch(Ctx) end,
+      [
+          t_resume_db_shard(),
+          t_sleep_based_on_count()
+     ]
+}.
+
+
+t_resume_db_shard() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        Change = {[{<<"id">>, DbName}]},
+        scan_changes_cb({change, Change, req}, type, {self(), 1}),
+        ResumeMsg = receive Msg -> Msg after 1000 -> timeout end,
+        ?assertMatch({'$gen_cast', {resume_scan, <<"shards/", _/binary>>}}, ResumeMsg),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_sleep_based_on_count() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        Change = {[{<<"id">>, DbName}]},
+        scan_changes_cb({change, Change, req}, type, {self(), 1000}),
+        Timeout = receive Msg -> Msg after 100 -> timeout end,
+        ?assertEqual(timeout, Timeout),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
 -endif.