You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/15 19:25:30 UTC

[01/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928 [Forced Update!]

Repository: couchdb-mem3
Updated Branches:
  refs/heads/COUCHDB-3326-clustered-purge 8b3a6b19a -> e4e892899 (forced update)


Update handle_config_terminate API

COUCHDB-3102


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/432264ae
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/432264ae
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/432264ae

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 432264ae1f66ea7ad960352583899cb4940e2723
Parents: 15615b2
Author: ILYA Khlopotov <ii...@ca.ibm.com>
Authored: Wed Aug 17 11:57:54 2016 -0700
Committer: ILYA Khlopotov <ii...@ca.ibm.com>
Committed: Tue Aug 23 12:28:03 2016 -0700

----------------------------------------------------------------------
 src/mem3_shards.erl              |  16 ++--
 src/mem3_sync_event_listener.erl | 173 +++++++++++++++++++++++++---------
 2 files changed, 138 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/432264ae/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 8a1bb54..c7f33c6 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -12,7 +12,7 @@
 
 -module(mem3_shards).
 -behaviour(gen_server).
--vsn(2).
+-vsn(3).
 -behaviour(config_listener).
 
 -export([init/1, terminate/2, code_change/3]).
@@ -36,6 +36,7 @@
 -define(DBS, mem3_dbs).
 -define(SHARDS, mem3_shards).
 -define(ATIMES, mem3_atimes).
+-define(RELISTEN_DELAY, 5000).
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -173,12 +174,10 @@ handle_config_change("mem3", "shards_db", _DbName, _, _) ->
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
 
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_, _, _) ->
-    spawn(fun() ->
-        timer:sleep(5000),
-        config:listen_for_changes(?MODULE, nil)
-    end).
+handle_config_terminate(_, stop, _) ->
+    ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
 
 init([]) ->
     ets:new(?SHARDS, [
@@ -235,6 +234,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
 handle_info({start_listener, Seq}, St) ->
     {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
     {noreply, St#st{changes_pid=NewPid}};
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State};
 handle_info(_Msg, St) ->
     {noreply, St}.
 

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/432264ae/src/mem3_sync_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl
index ca058db..7859c31 100644
--- a/src/mem3_sync_event_listener.erl
+++ b/src/mem3_sync_event_listener.erl
@@ -12,7 +12,7 @@
 
 -module(mem3_sync_event_listener).
 -behavior(couch_event_listener).
--behavior(config_listener).
+-vsn(1).
 
 -export([
     start_link/0
@@ -26,13 +26,14 @@
     handle_info/2
 ]).
 
--export([
-    handle_config_change/5,
-    handle_config_terminate/3
-]).
-
 -include_lib("mem3/include/mem3.hrl").
 
+-ifdef(TEST).
+-define(RELISTEN_DELAY, 500).
+-else.
+-define(RELISTEN_DELAY, 5000).
+-endif.
+
 -record(state, {
     nodes,
     shards,
@@ -59,7 +60,7 @@ start_link() ->
     couch_event_listener:start_link(?MODULE, [], [all_dbs]).
 
 init(_) ->
-    config:listen_for_changes(?MODULE, undefined),
+    ok = subscribe_for_config(),
     Delay = config:get_integer("mem3", "sync_delay", 5000),
     Frequency = config:get_integer("mem3", "sync_frequency", 500),
     Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()),
@@ -110,48 +111,32 @@ handle_cast(Msg, St) ->
 
 handle_info(timeout, St) ->
     maybe_push_shards(St);
+handle_info({config_change, "mem3", "sync_delay", Value, _}, St) ->
+    set_config(set_delay, Value, "ignoring bad value for mem3.sync_delay"),
+    maybe_push_shards(St);
+handle_info({config_change, "mem3", "sync_frequency", Value, _}, St) ->
+    set_config(set_frequency, Value, "ignoring bad value for mem3.sync_frequency"),
+    maybe_push_shards(St);
+handle_info({gen_event_EXIT, _Handler, _Reason}, St) ->
+    erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
+    maybe_push_shards(St);
+handle_info(restart_config_listener, St) ->
+    ok = subscribe_for_config(),
+    maybe_push_shards(St);
+handle_info({get_state, Ref, Caller}, St) ->
+    Caller ! {Ref, St},
+    {ok, St};
 handle_info(Msg, St) ->
     couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
     maybe_push_shards(St).
 
-handle_config_change("mem3", "sync_delay", Delay0, _, St) ->
-    try list_to_integer(Delay0) of
-        Delay1 ->
-            couch_event_listener:cast(
-                ?MODULE,
-                {set_delay, Delay1}
-            )
-    catch error:badarg ->
-        couch_log:warning(
-            "ignoring bad value for mem3.sync_delay: ~p",
-            [Delay0]
-        )
-    end,
-    {ok, St};
-handle_config_change("mem3", "sync_frequency", Frequency0, _, St) ->
-    try list_to_integer(Frequency0) of
-        Frequency1 ->
-            couch_event_listener:cast(
-                ?MODULE,
-                {set_frequency, Frequency1}
-            )
+set_config(Cmd, Value, Error) ->
+    try list_to_integer(Value) of
+        IntegerValue ->
+            couch_event_listener:cast(self(), {Cmd, IntegerValue})
     catch error:badarg ->
-        couch_log:warning(
-            "ignoring bad value for mem3.sync_frequency: ~p",
-            [Frequency0]
-        )
-    end,
-    {ok, St};
-handle_config_change(_, _, _, _, St) ->
-    {ok, St}.
-
-handle_config_terminate(_, stop, _) -> ok;
-handle_config_terminate(_Server, _Reason, St) ->
-    Fun = fun() ->
-        timer:sleep(5000),
-        config:listen_for_changes(?MODULE, St)
-    end,
-    spawn(Fun).
+        couch_log:warning("~s: ~p", [Error, Value])
+    end.
 
 bucket_shard(ShardName, [B|Bs]=Buckets0) ->
     case waiting(ShardName, Buckets0) of
@@ -222,3 +207,103 @@ push_shard(ShardName) ->
     catch error:database_does_not_exist ->
         ok
     end.
+
+subscribe_for_config() ->
+    config:subscribe_for_changes([
+        {"mem3", "sync_delay"},
+        {"mem3", "sync_frequency"}
+    ]).
+
+-ifdef(TEST).
+-include_lib("couch/include/couch_eunit.hrl").
+
+setup() ->
+    ok = meck:new(couch_event, [passthrough]),
+    ok = meck:expect(couch_event, register_all, ['_'], ok),
+
+    ok = meck:new(config_notifier, [passthrough]),
+    ok = meck:expect(config_notifier, handle_event, [
+        {[{'_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)},
+        {['_', '_'], meck:passthrough()}
+    ]),
+
+    application:start(config),
+    {ok, Pid} = ?MODULE:start_link(),
+    erlang:unlink(Pid),
+    meck:wait(config_notifier, subscribe, '_', 1000),
+    Pid.
+
+teardown(Pid) ->
+    exit(Pid, shutdown),
+    application:stop(config),
+    (catch meck:unload(couch_event)),
+    (catch meck:unload(config_notifier)),
+    ok.
+
+subscribe_for_config_test_() ->
+    {
+        "Subscrive for configuration changes",
+        {
+            foreach,
+            fun setup/0, fun teardown/1,
+            [
+                fun should_set_sync_delay/1,
+                fun should_set_sync_frequency/1,
+                fun should_restart_listener/1,
+                fun should_terminate/1
+            ]
+        }
+    }.
+
+should_set_sync_delay(Pid) ->
+    ?_test(begin
+        config:set("mem3", "sync_delay", "123", false),
+        ?assertMatch(#state{delay = 123}, capture(Pid)),
+        ok
+    end).
+
+should_set_sync_frequency(Pid) ->
+    ?_test(begin
+        config:set("mem3", "sync_frequency", "456", false),
+        ?assertMatch(#state{frequency = 456}, capture(Pid)),
+        ok
+    end).
+
+should_restart_listener(Pid) ->
+    ?_test(begin
+        meck:reset(config_notifier),
+        config:set("mem3", "sync_frequency", "error", false),
+
+        meck:wait(config_notifier, subscribe, '_', 1000),
+        ok
+    end).
+
+should_terminate(Pid) ->
+    ?_test(begin
+        ?assert(is_process_alive(Pid)),
+
+        EventMgr = whereis(config_event),
+
+        RestartFun = fun() -> exit(EventMgr, kill) end,
+        test_util:with_process_restart(config_event, RestartFun),
+
+        ?assertNot(is_process_alive(EventMgr)),
+        ?assertNot(is_process_alive(Pid)),
+        ?assert(is_process_alive(whereis(config_event))),
+        ok
+    end).
+
+capture(Pid) ->
+    Ref = make_ref(),
+    WaitFun = fun() ->
+        Pid ! {get_state, Ref, self()},
+        receive
+            {Ref, State} -> State
+        after 0 ->
+            wait
+        end
+    end,
+    test_util:wait(WaitFun).
+
+
+-endif.


[06/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Fix mem3_sync_event_listener unit test

The `should_restart_listener` test was failing because the pattern
supplied to meck was wrong. The argument to
`config_notifier:handle_event/2` is of the pattern `{config_change,
Section, Key, Value, Persist}` but the test was only matching a
four-tuple which I assume was missing the `config_change` atom in the
argument.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/589caba1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/589caba1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/589caba1

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 589caba108cfee784a1c434d2a404bccdbc4d624
Parents: c3c5429
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Feb 2 11:23:16 2017 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Feb 2 11:25:03 2017 -0600

----------------------------------------------------------------------
 src/mem3_sync_event_listener.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/589caba1/src/mem3_sync_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl
index 7859c31..cd671e4 100644
--- a/src/mem3_sync_event_listener.erl
+++ b/src/mem3_sync_event_listener.erl
@@ -223,7 +223,7 @@ setup() ->
 
     ok = meck:new(config_notifier, [passthrough]),
     ok = meck:expect(config_notifier, handle_event, [
-        {[{'_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)},
+        {[{'_', '_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)},
         {['_', '_'], meck:passthrough()}
     ]),
 


[09/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Fix stale shards cache

There's a race condition in mem3_shards that can result in having shards
in the cache for a database that's been deleted. This results in a
confused cluster that thinks a database exists until you attempt to open
it.

The fix is to ignore any cache insert requests that come from an older
version of the dbs db than mem3_shards cache knows about.

Big thanks to @jdoane for the identification and original patch.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/b1b9b3e0
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/b1b9b3e0
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/b1b9b3e0

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: b1b9b3e039993e3248925ba9783119e4bc79062b
Parents: b1ea253
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 24 12:55:37 2017 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Fri Feb 24 13:24:16 2017 -0600

----------------------------------------------------------------------
 src/mem3_shards.erl | 54 ++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b1b9b3e0/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index db5145d..9654488 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -27,7 +27,8 @@
 -record(st, {
     max_size = 25000,
     cur_size = 0,
-    changes_pid
+    changes_pid,
+    update_seq
 }).
 
 -include_lib("mem3/include/mem3.hrl").
@@ -191,11 +192,12 @@ init([]) ->
     ets:new(?ATIMES, [ordered_set, protected, named_table]),
     ok = config:listen_for_changes(?MODULE, nil),
     SizeList = config:get("mem3", "shard_cache_size", "25000"),
-    {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end),
+    UpdateSeq = get_update_seq(),
     {ok, #st{
         max_size = list_to_integer(SizeList),
         cur_size = 0,
-        changes_pid = Pid
+        changes_pid = start_changes_listener(UpdateSeq),
+        update_seq = UpdateSeq
     }}.
 
 handle_call({set_max_size, Size}, _From, St) ->
@@ -210,12 +212,23 @@ handle_cast({cache_hit, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, hit]),
     cache_hit(DbName),
     {noreply, St};
-handle_cast({cache_insert, DbName, Shards}, St) ->
+handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, miss]),
-    {noreply, cache_free(cache_insert(St, DbName, Shards))};
+    NewSt = case UpdateSeq < St#st.update_seq of
+        true -> St;
+        false -> cache_free(cache_insert(St, DbName, Shards))
+    end,
+    {noreply, NewSt};
 handle_cast({cache_remove, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, eviction]),
     {noreply, cache_remove(St, DbName)};
+handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
+    Msg = {cache_insert, DbName, Shards, UpdateSeq},
+    {noreply, NewSt} = handle_cast(Msg, St),
+    {noreply, NewSt#st{update_seq = UpdateSeq}};
+handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
+    {noreply, NewSt} = handle_cast({cache_remove, DbName}, St),
+    {noreply, NewSt#st{update_seq = UpdateSeq}};
 handle_cast(_Msg, St) ->
     {noreply, St}.
 
@@ -232,8 +245,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
     erlang:send_after(5000, self(), {start_listener, Seq}),
     {noreply, NewSt#st{changes_pid=undefined}};
 handle_info({start_listener, Seq}, St) ->
-    {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
-    {noreply, St#st{changes_pid=NewPid}};
+    {noreply, St#st{
+        changes_pid = start_changes_listener(Seq)
+    }};
 handle_info(restart_config_listener, State) ->
     ok = config:listen_for_changes(?MODULE, nil),
     {noreply, State};
@@ -249,6 +263,21 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
 
 %% internal functions
 
+start_changes_listener(SinceSeq) ->
+    Self = self(),
+    {Pid, _} = erlang:spawn_monitor(fun() ->
+        erlang:spawn_link(fun() ->
+            Ref = erlang:monitor(process, Self),
+            receive
+                {'DOWN', Ref, _, _, _} ->
+                    ok
+            end,
+            exit(shutdown)
+        end),
+        listen_for_changes(SinceSeq)
+    end),
+    Pid.
+
 fold_fun(#full_doc_info{}=FDI, Acc) ->
     DI = couch_doc:to_doc_info(FDI),
     fold_fun(DI, Acc);
@@ -288,10 +317,11 @@ changes_callback({stop, EndSeq}, _) ->
     exit({seq, EndSeq});
 changes_callback({change, {Change}, _}, _) ->
     DbName = couch_util:get_value(<<"id">>, Change),
+    Seq = couch_util:get_value(<<"seq">>, Change),
     case DbName of <<"_design/", _/binary>> -> ok; _Else ->
         case mem3_util:is_deleted(Change) of
         true ->
-            gen_server:cast(?MODULE, {cache_remove, DbName});
+            gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq});
         false ->
             case couch_util:get_value(doc, Change) of
             {error, Reason} ->
@@ -299,13 +329,14 @@ changes_callback({change, {Change}, _}, _) ->
                     [DbName, Reason]);
             {Doc} ->
                 Shards = mem3_util:build_ordered_shards(DbName, Doc),
-                gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+                Msg = {cache_insert_change, DbName, Shards, Seq},
+                gen_server:cast(?MODULE, Msg),
                 [create_if_missing(mem3:name(S), mem3:engine(S)) || S
                     <- Shards, mem3:node(S) =:= node()]
             end
         end
     end,
-    {ok, couch_util:get_value(<<"seq">>, Change)};
+    {ok, Seq};
 changes_callback(timeout, _) ->
     ok.
 
@@ -321,8 +352,9 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
 load_shards_from_db(ShardDb, DbName) ->
     case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
     {ok, #doc{body = {Props}}} ->
+        Seq = couch_db:get_update_seq(ShardDb),
         Shards = mem3_util:build_ordered_shards(DbName, Props),
-        gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+        gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
         Shards;
     {not_found, _} ->
         erlang:error(database_does_not_exist, ?b2l(DbName))


[05/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Remove public db record

COUCHDB-3288


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/3de48d23
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/3de48d23
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/3de48d23

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 3de48d23ba661db4e4f856a1893f7cb69ed45131
Parents: 589caba
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Feb 1 17:17:05 2017 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Feb 2 11:25:03 2017 -0600

----------------------------------------------------------------------
 src/mem3.erl        | 17 +++++++++--------
 src/mem3_httpd.erl  |  4 ++--
 src/mem3_nodes.erl  | 10 +++++-----
 src/mem3_rep.erl    | 12 ++++++------
 src/mem3_rpc.erl    |  4 ++--
 src/mem3_shards.erl |  4 ++--
 6 files changed, 26 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/3de48d23/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 405d7e5..e9c1473 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -145,13 +145,13 @@ get_shard(DbName, Node, Range) ->
 local_shards(DbName) ->
     mem3_shards:local(DbName).
 
-shard_suffix(#db{name=DbName}) ->
-    shard_suffix(DbName);
-shard_suffix(DbName0) ->
+shard_suffix(DbName0) when is_binary(DbName0) ->
     Shard = hd(shards(DbName0)),
     <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> =
         Shard#shard.name,
-    filename:extension(binary_to_list(DbName)).
+    filename:extension(binary_to_list(DbName));
+shard_suffix(Db) ->
+    shard_suffix(couch_db:name(Db)).
 
 fold_shards(Fun, Acc) ->
     mem3_shards:fold(Fun, Acc).
@@ -292,10 +292,11 @@ group_by_range(Shards) ->
 
 % quorum functions
 
-quorum(#db{name=DbName}) ->
-    quorum(DbName);
-quorum(DbName) ->
-    n(DbName) div 2 + 1.
+quorum(DbName) when is_binary(DbName) ->
+    n(DbName) div 2 + 1;
+quorum(Db) ->
+    quorum(couch_db:name(Db)).
+
 
 node(#shard{node=Node}) ->
     Node;

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/3de48d23/src/mem3_httpd.erl
----------------------------------------------------------------------
diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl
index 5358158..571f063 100644
--- a/src/mem3_httpd.erl
+++ b/src/mem3_httpd.erl
@@ -32,7 +32,7 @@ handle_membership_req(#httpd{path_parts=[<<"_membership">>]}=Req) ->
 
 handle_shards_req(#httpd{method='GET',
         path_parts=[_DbName, <<"_shards">>]} = Req, Db) ->
-    DbName = mem3:dbname(Db#db.name),
+    DbName = mem3:dbname(couch_db:name(Db)),
     Shards = mem3:shards(DbName),
     JsonShards = json_shards(Shards, dict:new()),
     couch_httpd:send_json(Req, {[
@@ -40,7 +40,7 @@ handle_shards_req(#httpd{method='GET',
     ]});
 handle_shards_req(#httpd{method='GET',
         path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) ->
-    DbName = mem3:dbname(Db#db.name),
+    DbName = mem3:dbname(couch_db:name(Db)),
     Shards = mem3:shards(DbName, DocId),
     {[{Shard, Dbs}]} = json_shards(Shards, dict:new()),
     couch_httpd:send_json(Req, {[

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/3de48d23/src/mem3_nodes.erl
----------------------------------------------------------------------
diff --git a/src/mem3_nodes.erl b/src/mem3_nodes.erl
index f31891a..555389b 100644
--- a/src/mem3_nodes.erl
+++ b/src/mem3_nodes.erl
@@ -92,7 +92,7 @@ code_change(_OldVsn, #state{}=State, _Extra) ->
 initialize_nodelist() ->
     DbName = config:get("mem3", "nodes_db", "_nodes"),
     {ok, Db} = mem3_util:ensure_exists(DbName),
-    {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []),
+    {ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []),
     % add self if not already present
     case ets:lookup(?MODULE, node()) of
     [_] ->
@@ -103,13 +103,13 @@ initialize_nodelist() ->
         {ok, _} = couch_db:update_doc(Db, Doc, [])
     end,
     couch_db:close(Db),
-    Db#db.update_seq.
+    couch_db:get_update_seq(Db).
 
-first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
     {ok, Acc};
-first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+first_fold(#full_doc_info{deleted=true}, Acc) ->
     {ok, Acc};
-first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) ->
+first_fold(#full_doc_info{id=Id}=DocInfo, Db) ->
     {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]),
     ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}),
     {ok, Db}.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/3de48d23/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index ad7ac55..85d46e2 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -170,11 +170,11 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
     end.
 
 
-repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) ->
-    erlang:put(io_priority, {internal_repl, DbName}),
+repl(Db, Acc0) ->
+    erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
     #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
     Fun = fun ?MODULE:changes_enumerator/3,
-    {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
+    {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []),
     {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
     {ok, couch_db:count_changes_since(Db, LastSeq)}.
 
@@ -354,10 +354,10 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
         end
     end,
     Options = [{start_key, DocIdPrefix}],
-    case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of
-        {ok, _, {TgtUUID, Doc}} ->
+    case couch_db:fold_local_docs(SrcDb, FoldFun, not_found, Options) of
+        {ok, {TgtUUID, Doc}} ->
             {ok, TgtUUID, Doc};
-        {ok, _, not_found} ->
+        {ok, not_found} ->
             {not_found, missing};
         Else ->
             couch_log:error("Error finding replication doc: ~w", [Else]),

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/3de48d23/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 93cb99a..c2bd58f 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -84,11 +84,11 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
 save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
-        {ok, #db{update_seq = TargetSeq} = Db} ->
+        {ok, Db} ->
             NewEntry = {[
                 {<<"target_node">>, atom_to_binary(node(), utf8)},
                 {<<"target_uuid">>, couch_db:get_uuid(Db)},
-                {<<"target_seq">>, TargetSeq}
+                {<<"target_seq">>, couch_db:get_update_seq(Db)}
             ] ++ NewEntry0},
             Body = {[
                 {<<"seq">>, SourceSeq},

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/3de48d23/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index c7f33c6..f998552 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -267,7 +267,7 @@ get_update_seq() ->
     DbName = config:get("mem3", "shards_db", "_dbs"),
     {ok, Db} = mem3_util:ensure_exists(DbName),
     couch_db:close(Db),
-    Db#db.update_seq.
+    couch_db:get_update_seq(Db).
 
 listen_for_changes(Since) ->
     DbName = config:get("mem3", "shards_db", "_dbs"),
@@ -317,7 +317,7 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
         couch_db:close(Db)
     end.
 
-load_shards_from_db(#db{} = ShardDb, DbName) ->
+load_shards_from_db(ShardDb, DbName) ->
     case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
     {ok, #doc{body = {Props}}} ->
         Shards = mem3_util:build_ordered_shards(DbName, Props),


[03/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Chunk missing revisions before attempting to save on target

In cases with pathological documents revision patterns (e.g., 10000 open
conflicts and tree depth of 300000 on a single document), attempting to
replicate the full revision tree in one batch causes the system to crash by
attempting to send an oversized message. We've observed messages of > 4GB in the
wild.

This patch divides the set of revisions-to-replicate for a single document into
chunks of a configurable size, thereby allowing operators to keep the system
stable when attempting to replicate these troublesome documents.

BugzID: 37676


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/c4da61c8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/c4da61c8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/c4da61c8

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: c4da61c8eb98cedd3cf7a28c293cb1f6d3ec8571
Parents: 252467c
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Wed Oct 29 12:52:30 2014 -0700
Committer: Eric Avdey <ei...@eiri.ca>
Committed: Thu Nov 24 13:55:18 2016 -0400

----------------------------------------------------------------------
 src/mem3_rep.erl | 32 ++++++++++++++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/c4da61c8/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 938260d..ad7ac55 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -252,8 +252,10 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     [] ->
         ok;
     Missing ->
-        Docs = open_docs(Acc, Missing),
-        ok = save_on_target(Node, Name, Docs)
+        lists:map(fun(Chunk) ->
+            Docs = open_docs(Acc, Chunk),
+            ok = save_on_target(Node, Name, Docs)
+        end, chunk_revs(Missing))
     end,
     update_locals(Acc),
     {ok, Acc#acc{revcount=0, infos=[]}}.
@@ -271,6 +273,32 @@ find_missing_revs(Acc) ->
     ]).
 
 
+chunk_revs(Revs) ->
+    Limit = list_to_integer(config:get("mem3", "rev_chunk_size", "5000")),
+    chunk_revs(Revs, Limit).
+
+chunk_revs(Revs, Limit) ->
+    chunk_revs(Revs, {0, []}, [], Limit).
+
+chunk_revs([], {_Count, Chunk}, Chunks, _Limit) ->
+    [Chunk|Chunks];
+chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) when length(R) =< Limit - Count ->
+    chunk_revs(
+        Revs,
+        {Count + length(R), [{Id, R, A}|Chunk]},
+        Chunks,
+        Limit
+    );
+chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) ->
+    {This, Next} = lists:split(Limit - Count, R),
+    chunk_revs(
+        [{Id, Next, A}|Revs],
+        {0, []},
+        [[{Id, This, A}|Chunk]|Chunks],
+        Limit
+    ).
+
+
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),


[07/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Update to use the pluggable storage API

COUCHDB-3287


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/6d00baed
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/6d00baed
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/6d00baed

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 6d00baed15306ad952a93ce6254efab96bd0a730
Parents: 3de48d2
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Feb 10 16:56:28 2016 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Fri Feb 3 12:56:27 2017 -0600

----------------------------------------------------------------------
 src/mem3_nodes.erl  |  3 ++-
 src/mem3_rep.erl    | 13 ++++++-------
 src/mem3_shards.erl | 11 ++++++-----
 3 files changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/6d00baed/src/mem3_nodes.erl
----------------------------------------------------------------------
diff --git a/src/mem3_nodes.erl b/src/mem3_nodes.erl
index 555389b..019ceaf 100644
--- a/src/mem3_nodes.erl
+++ b/src/mem3_nodes.erl
@@ -102,8 +102,9 @@ initialize_nodelist() ->
         Doc = #doc{id = couch_util:to_binary(node())},
         {ok, _} = couch_db:update_doc(Db, Doc, [])
     end,
+    Seq = couch_db:get_update_seq(Db),
     couch_db:close(Db),
-    couch_db:get_update_seq(Db).
+    Seq.
 
 first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
     {ok, Acc};

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/6d00baed/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 85d46e2..44e6c5b 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -21,7 +21,7 @@
 ]).
 
 -export([
-    changes_enumerator/3
+    changes_enumerator/2
 ]).
 
 
@@ -173,8 +173,8 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
 repl(Db, Acc0) ->
     erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
     #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
-    Fun = fun ?MODULE:changes_enumerator/3,
-    {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []),
+    Fun = fun ?MODULE:changes_enumerator/2,
+    {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
     {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
     {ok, couch_db:count_changes_since(Db, LastSeq)}.
 
@@ -225,11 +225,10 @@ compare_epochs(Acc) ->
     Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
     Acc#acc{seq = Seq, history = {[]}}.
 
-changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
+changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) ->
     {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
-    changes_enumerator(FDI, Reds, Acc);
-changes_enumerator(#full_doc_info{}=FDI, _,
-  #acc{revcount=C, infos=Infos}=Acc0) ->
+    changes_enumerator(FDI, Acc);
+changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
     #doc_info{
         high_seq=Seq,
         revs=Revs

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/6d00baed/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index f998552..a638db8 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -156,7 +156,7 @@ fold(Fun, Acc) ->
     {ok, Db} = mem3_util:ensure_exists(DbName),
     FAcc = {Db, Fun, Acc},
     try
-        {ok, _, LastAcc} = couch_db:enum_docs(Db, fun fold_fun/3, FAcc, []),
+        {ok, LastAcc} = couch_db:fold_docs(Db, fun fold_fun/2, FAcc),
         {_Db, _UFun, UAcc} = LastAcc,
         UAcc
     after
@@ -249,10 +249,10 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
 
 %% internal functions
 
-fold_fun(#full_doc_info{}=FDI, _, Acc) ->
+fold_fun(#full_doc_info{}=FDI, Acc) ->
     DI = couch_doc:to_doc_info(FDI),
-    fold_fun(DI, nil, Acc);
-fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
+    fold_fun(DI, Acc);
+fold_fun(#doc_info{}=DI, {Db, UFun, UAcc}) ->
     case couch_db:open_doc(Db, DI, [ejson_body, conflicts]) of
         {ok, Doc} ->
             {Props} = Doc#doc.body,
@@ -266,8 +266,9 @@ fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
 get_update_seq() ->
     DbName = config:get("mem3", "shards_db", "_dbs"),
     {ok, Db} = mem3_util:ensure_exists(DbName),
+    Seq = couch_db:get_update_seq(Db),
     couch_db:close(Db),
-    couch_db:get_update_seq(Db).
+    Seq.
 
 listen_for_changes(Since) ->
     DbName = config:get("mem3", "shards_db", "_dbs"),


[08/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Store and use the storage engine property

This adds an optional key to database documents that lists the
configured storage engine. This allows mem3_shards to create the shard
with the appropriate storage engine when recovering from a network
split.

COUCHDB-3287


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/b1ea2537
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/b1ea2537
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/b1ea2537

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: b1ea25378dc85c57a471bd30661e0bd94ee5b983
Parents: 6d00bae
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Apr 6 11:18:01 2016 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Fri Feb 3 12:56:27 2017 -0600

----------------------------------------------------------------------
 include/mem3.hrl        |  6 ++++--
 src/mem3.erl            | 20 +++++++++++++++++---
 src/mem3_shards.erl     | 28 +++++++++++++---------------
 src/mem3_util.erl       | 17 ++++++++++++++---
 test/mem3_util_test.erl | 16 ++++++++--------
 5 files changed, 56 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b1ea2537/include/mem3.hrl
----------------------------------------------------------------------
diff --git a/include/mem3.hrl b/include/mem3.hrl
index d6ac0be..6579210 100644
--- a/include/mem3.hrl
+++ b/include/mem3.hrl
@@ -16,7 +16,8 @@
     node :: node() | '_',
     dbname :: binary(),
     range :: [non_neg_integer() | '$1' | '$2'] | '_',
-    ref :: reference() | 'undefined' | '_'
+    ref :: reference() | 'undefined' | '_',
+    opts :: list()
 }).
 
 %% Do not reference outside of mem3.
@@ -26,7 +27,8 @@
     dbname :: binary(),
     range :: [non_neg_integer() | '$1' | '$2'] | '_',
     ref :: reference() | 'undefined' | '_',
-    order :: non_neg_integer() | 'undefined' | '_'
+    order :: non_neg_integer() | 'undefined' | '_',
+    opts :: list()
 }).
 
 %% types

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b1ea2537/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index e9c1473..5e218f7 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -23,7 +23,7 @@
 -export([get_placement/1]).
 
 %% For mem3 use only.
--export([name/1, node/1, range/1]).
+-export([name/1, node/1, range/1, engine/1]).
 
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
@@ -99,7 +99,8 @@ shards_int(DbName, Options) ->
             name = ShardDbName,
             dbname = ShardDbName,
             range = [0, (2 bsl 31)-1],
-            order = undefined}];
+            order = undefined,
+            opts = []}];
     ShardDbName ->
         %% shard_db is treated as a single sharded db to support calls to db_info
         %% and view_all_docs
@@ -107,7 +108,8 @@ shards_int(DbName, Options) ->
             node = node(),
             name = ShardDbName,
             dbname = ShardDbName,
-            range = [0, (2 bsl 31)-1]}];
+            range = [0, (2 bsl 31)-1],
+            opts = []}];
     _ ->
         mem3_shards:for_db(DbName, Options)
     end.
@@ -307,3 +309,15 @@ name(#shard{name=Name}) ->
     Name;
 name(#ordered_shard{name=Name}) ->
     Name.
+
+engine(#shard{opts=Opts}) ->
+    engine(Opts);
+engine(#ordered_shard{opts=Opts}) ->
+    engine(Opts);
+engine(Opts) when is_list(Opts) ->
+    case couch_util:get_value(engine, Opts) of
+        Engine when is_binary(Engine) ->
+            [{engine, Engine}];
+        _ ->
+            []
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b1ea2537/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index a638db8..db5145d 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -300,7 +300,7 @@ changes_callback({change, {Change}, _}, _) ->
             {Doc} ->
                 Shards = mem3_util:build_ordered_shards(DbName, Doc),
                 gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
-                [create_if_missing(mem3:name(S)) || S
+                [create_if_missing(mem3:name(S), mem3:engine(S)) || S
                     <- Shards, mem3:node(S) =:= node()]
             end
         end
@@ -337,20 +337,18 @@ in_range(Shard, HashKey) ->
     [B, E] = mem3:range(Shard),
     B =< HashKey andalso HashKey =< E.
 
-create_if_missing(Name) ->
-    DbDir = config:get("couchdb", "database_dir"),
-    Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"),
-    case filelib:is_regular(Filename) of
-    true ->
-        ok;
-    false ->
-        case couch_server:create(Name, [?ADMIN_CTX]) of
-        {ok, Db} ->
-            couch_db:close(Db);
-        Error ->
-            couch_log:error("~p tried to create ~s, got ~p",
-                [?MODULE, Name, Error])
-        end
+create_if_missing(Name, Options) ->
+    case couch_server:exists(Name) of
+        true ->
+            ok;
+        false ->
+            case couch_server:create(Name, [?ADMIN_CTX] ++ Options) of
+            {ok, Db} ->
+                couch_db:close(Db);
+            Error ->
+                couch_log:error("~p tried to create ~s, got ~p",
+                    [?MODULE, Name, Error])
+            end
     end.
 
 cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b1ea2537/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 2cd444d..4e1b5fd 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -153,6 +153,10 @@ build_ordered_shards(DbName, DocProps) ->
 build_shards_by_node(DbName, DocProps) ->
     {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
     Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+    EngineOpt = case couch_util:get_value(<<"engine">>, DocProps) of
+        Engine when is_binary(Engine) -> [{engine, Engine}];
+        _ -> []
+    end,
     lists:flatmap(fun({Node, Ranges}) ->
         lists:map(fun(Range) ->
             [B,E] = string:tokens(?b2l(Range), "-"),
@@ -161,7 +165,8 @@ build_shards_by_node(DbName, DocProps) ->
             name_shard(#shard{
                 dbname = DbName,
                 node = to_atom(Node),
-                range = [Beg, End]
+                range = [Beg, End],
+                opts = EngineOpt
             }, Suffix)
         end, Ranges)
     end, ByNode).
@@ -169,6 +174,10 @@ build_shards_by_node(DbName, DocProps) ->
 build_shards_by_range(DbName, DocProps) ->
     {ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}),
     Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+    EngineOpt = case couch_util:get_value(<<"engine">>, DocProps) of
+        Engine when is_binary(Engine) -> [{engine, Engine}];
+        _ -> []
+    end,
     lists:flatmap(fun({Range, Nodes}) ->
         lists:map(fun({Node, Order}) ->
             [B,E] = string:tokens(?b2l(Range), "-"),
@@ -178,7 +187,8 @@ build_shards_by_range(DbName, DocProps) ->
                 dbname = DbName,
                 node = to_atom(Node),
                 range = [Beg, End],
-                order = Order
+                order = Order,
+                opts = EngineOpt
             }, Suffix)
         end, lists:zip(Nodes, lists:seq(1, length(Nodes))))
     end, ByRange).
@@ -247,7 +257,8 @@ downcast(#ordered_shard{}=S) ->
        node = S#ordered_shard.node,
        dbname = S#ordered_shard.dbname,
        range = S#ordered_shard.range,
-       ref = S#ordered_shard.ref
+       ref = S#ordered_shard.ref,
+       opts = S#ordered_shard.opts
       };
 downcast(Shards) when is_list(Shards) ->
     [downcast(Shard) || Shard <- Shards].

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b1ea2537/test/mem3_util_test.erl
----------------------------------------------------------------------
diff --git a/test/mem3_util_test.erl b/test/mem3_util_test.erl
index 340a58a..42bc5c7 100644
--- a/test/mem3_util_test.erl
+++ b/test/mem3_util_test.erl
@@ -85,35 +85,35 @@ build_shards_test() ->
         [{shard,<<"shards/00000000-1fffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [0,536870911],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/20000000-3fffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [536870912,1073741823],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/40000000-5fffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [1073741824,1610612735],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/60000000-7fffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [1610612736,2147483647],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/80000000-9fffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [2147483648,2684354559],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/a0000000-bfffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [2684354560,3221225471],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/c0000000-dfffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [3221225472,3758096383],
-          undefined},
+          undefined,[]},
          {shard,<<"shards/e0000000-ffffffff/testdb1">>,
           'bigcouch@node.local',<<"testdb1">>,
           [3758096384,4294967295],
-          undefined}],
+          undefined,[]}],
     ?assertEqual(ExpectedShards1, Shards1),
     ok.
 


[10/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Add internal replication of purge requests

COUCHDB-3326


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e4e89289
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e4e89289
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e4e89289

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: e4e892899ef82e47584b9af59b709e57092018a9
Parents: b1b9b3e
Author: Mayya Sharipova <ma...@ca.ibm.com>
Authored: Mon Oct 17 17:22:16 2016 -0400
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 15 14:22:08 2017 -0500

----------------------------------------------------------------------
 src/mem3_rep.erl | 136 ++++++++++++++++++++++++++++++++++++++++++++++----
 src/mem3_rpc.erl |  94 ++++++++++++++++++++++++++++++++--
 2 files changed, 214 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e4e89289/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 44e6c5b..34d9ac1 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -17,7 +17,9 @@
     go/2,
     go/3,
     make_local_id/2,
-    find_source_seq/4
+    make_local_purge_id/2,
+    find_source_seq/4,
+    mem3_sync_purge/1
 ]).
 
 -export([
@@ -39,7 +41,8 @@
     target,
     filter,
     db,
-    history = {[]}
+    history = {[]},
+    purge_seq = 0
 }).
 
 
@@ -119,6 +122,10 @@ make_local_id(SourceThing, TargetThing, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
+make_local_purge_id(SourceUUID, TargetUUID) ->
+   <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
 %%
@@ -172,11 +179,55 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
 
 repl(Db, Acc0) ->
     erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
-    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
-    Fun = fun ?MODULE:changes_enumerator/2,
-    {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
-    {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-    {ok, couch_db:count_changes_since(Db, LastSeq)}.
+    #acc{source = Db2} = Acc1 = pull_purges_from_target(Db, Acc0),
+    #acc{seq=Seq} = Acc2 = calculate_start_seq(Acc1),
+    try
+        % this throws an exception: {invalid_start_purge_seq, PurgeSeq0},
+        % when oldest_purge_seq on source > the last source purge_seq known to target
+        Acc3 = replicate_purged_docs(Acc2),
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc4} = couch_db:fold_changes(Db2, Seq, Fun, Acc3),
+        {ok, #acc{seq = LastSeq}} = replicate_batch(Acc4),
+        {ok, couch_db:count_changes_since(Db2, LastSeq)}
+    catch
+        throw:{invalid_start_purge_seq, PurgeSeq} ->
+            couch_log:error(
+                "oldest_purge_seq on source passed purge_seq: ~p known to target for db: ~p",
+                [PurgeSeq, DbName]
+            )
+    end.
+
+
+pull_purges_from_target(Db, #acc{target=#shard{node=TNode, name=DbName}}=Acc) ->
+    SourceUUID = couch_db:get_uuid(Db),
+    {TUUIDsIdsRevs, TargetPDocID, TargetPSeq} =
+            mem3_rpc:load_purges(TNode, DbName, SourceUUID),
+    Acc2 = case TUUIDsIdsRevs of
+        [] -> Acc#acc{source = Db};
+        _ ->
+            % check which Target UUIDs have not been applied to Source
+            UUIDs = [UUID || {UUID, _Id, _Revs} <- TUUIDsIdsRevs],
+            PurgedDocs = couch_db:open_purged_docs(Db, UUIDs),
+            Results = lists:zip(TUUIDsIdsRevs, PurgedDocs),
+            Unapplied = lists:filtermap(fun
+                ({UUIDIdRevs, not_found}) -> {true, UUIDIdRevs};
+                (_) -> false
+            end, Results),
+            Acc1 = case Unapplied of
+                [] -> Acc#acc{source = Db};
+                _ ->
+                    % purge Db on Source and reopen it
+                    couch_db:purge_docs(Db, Unapplied),
+                    couch_db:close(Db),
+                    {ok, Db2} = couch_db:open(DbName, [?ADMIN_CTX]),
+                    Acc#acc{source = Db2}
+            end,
+            % update on Target target_purge_seq known to Source
+            mem3_rpc:save_purge_checkpoint(TNode, DbName, TargetPDocID,
+                    TargetPSeq, node()),
+            Acc1
+    end,
+    Acc2.
 
 
 calculate_start_seq(Acc) ->
@@ -210,7 +261,31 @@ calculate_start_seq(Acc) ->
                     Seq = TargetSeq,
                     History = couch_util:get_value(<<"history">>, TProps, {[]})
             end,
-            Acc1#acc{seq = Seq, history = History};
+            SourcePurgeSeq0 = couch_util:get_value(<<"purge_seq">>, SProps),
+            TargetPurgeSeq0 = couch_util:get_value(<<"purge_seq">>, TProps),
+            % before purge upgrade, purge_seq was not saved in checkpoint file,
+            % thus get purge_seq directly from dbs
+            SourcePurgeSeq = case is_integer(SourcePurgeSeq0) of
+                true ->
+                    SourcePurgeSeq0;
+                false ->
+                    {ok, SPS} = couch_db:get_purge_seq(Db),
+                    SPS
+            end,
+            TargetPurgeSeq = case is_integer(TargetPurgeSeq0) of
+                true ->
+                    TargetPurgeSeq0;
+                false ->
+                    {ok, TPS} = mem3_rpc:get_purge_seq(Node, Name),
+                    TPS
+            end,
+            case SourcePurgeSeq =< TargetPurgeSeq of
+                true ->
+                    PurgeSeq = SourcePurgeSeq;
+                false ->
+                    PurgeSeq = TargetPurgeSeq
+            end,
+            Acc1#acc{seq = Seq, history = History, purge_seq = PurgeSeq};
         {not_found, _} ->
             compare_epochs(Acc1)
     end.
@@ -246,6 +321,27 @@ changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
     {Go, Acc1}.
 
 
+replicate_purged_docs(Acc0) ->
+    #acc{
+        source = Db,
+        target = #shard{node=Node, name=Name},
+        purge_seq = PurgeSeq0
+    } = Acc0,
+    PFoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+        [{UUID, Id, Revs} | Acc]
+    end,
+
+    {ok, UUIDsIdsRevs} = couch_db:fold_purged_docs(Db, PurgeSeq0, PFoldFun, [], []),
+    case UUIDsIdsRevs of
+        [] ->
+            Acc0;
+        _ ->
+            ok = purge_on_target(Node, Name, UUIDsIdsRevs),
+            {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+            Acc0#acc{purge_seq = PurgeSeq}
+    end.
+
+
 replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     case find_missing_revs(Acc) of
     [] ->
@@ -319,8 +415,19 @@ save_on_target(Node, Name, Docs) ->
     ok.
 
 
+purge_on_target(Node, Name, UUIdsIdsRevs) ->
+    mem3_rpc:purge_docs(Node, Name, UUIdsIdsRevs,[
+        replicated_changes,
+        full_commit,
+        ?ADMIN_CTX,
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
+
 update_locals(Acc) ->
-    #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
+    #acc{seq=Seq, purge_seq = PurgeSeq, source=Db, target=Target,
+            localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
     NewEntry = [
         {<<"source_node">>, atom_to_binary(node(), utf8)},
@@ -328,8 +435,9 @@ update_locals(Acc) ->
         {<<"source_seq">>, Seq},
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
     ],
-    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
-    {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
+    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, PurgeSeq,
+            NewEntry, History),
+   {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
 find_repl_doc(SrcDb, TgtUUIDPrefix) ->
@@ -364,6 +472,12 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     end.
 
 
+% used during compaction to check if _local/purge doc is current
+mem3_sync_purge(Opts)->
+    Node = couch_util:get_value(<<"node">>, Opts),
+    lists:member(mem3:nodes(), Node).
+
+
 is_prefix(Prefix, Subject) ->
     binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
 

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e4e89289/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index c2bd58f..a0394ce 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -19,15 +19,21 @@
     find_common_seq/4,
     get_missing_revs/4,
     update_docs/4,
+    get_purge_seq/2,
+    purge_docs/4,
     load_checkpoint/4,
-    save_checkpoint/6
+    save_checkpoint/7,
+    load_purges/3,
+    save_purge_checkpoint/5
 ]).
 
 % Private RPC callbacks
 -export([
     find_common_seq_rpc/3,
     load_checkpoint_rpc/3,
-    save_checkpoint_rpc/5
+    save_checkpoint_rpc/6,
+    load_purges_rpc/2,
+    save_purge_checkpoint_rpc/4
 ]).
 
 
@@ -43,16 +49,34 @@ update_docs(Node, DbName, Docs, Options) ->
     rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
 
 
+get_purge_seq(Node, DbName) ->
+    rexi_call(Node, {fabric_rpc, get_purge_seq, [DbName]}).
+
+
+purge_docs(Node, DbName, PUUIdsIdsRevs, Options) ->
+    rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PUUIdsIdsRevs, Options]}).
+
+
 load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
     Args = [DbName, SourceNode, SourceUUID],
     rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
 
 
-save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
-    Args = [DbName, DocId, Seq, Entry, History],
+save_checkpoint(Node, DbName, DocId, Seq, PurgeSeq, Entry, History) ->
+    Args = [DbName, DocId, Seq, PurgeSeq, Entry, History],
     rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
 
 
+load_purges(Node, DbName, SourceUUID) ->
+    Args = [DbName, SourceUUID],
+    rexi_call(Node, {mem3_rpc, load_purges_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, DocId, PurgeSeq, SourceNode) ->
+    Args = [DbName, DocId, PurgeSeq, SourceNode],
+    rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
+
+
 find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
     Args = [DbName, SourceUUID, SourceEpochs],
     rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
@@ -81,7 +105,8 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
     end.
 
 
-save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+save_checkpoint_rpc(DbName, Id, SourceSeq, SourcePurgeSeq,
+        NewEntry0, History0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
         {ok, Db} ->
@@ -92,6 +117,7 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
             ] ++ NewEntry0},
             Body = {[
                 {<<"seq">>, SourceSeq},
+                {<<"purge_seq">>, SourcePurgeSeq},
                 {<<"target_uuid">>, couch_db:get_uuid(Db)},
                 {<<"history">>, add_checkpoint(NewEntry, History0)}
             ]},
@@ -128,6 +154,64 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
     end.
 
 
+load_purges_rpc(DbName, SourceUUID) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+    {ok, Db} ->
+        TargetUUID = couch_db:get_uuid(Db),
+        DocId = mem3_rep:make_local_purge_id(SourceUUID, TargetUUID),
+        LastPSeq = case couch_db:open_doc(Db, DocId, []) of
+            {ok, #doc{body={Props}} } ->
+                couch_util:get_value(<<"purge_seq">>, Props);
+            {not_found, _} ->
+                0
+        end,
+        {ok, CurPSeq} = couch_db:get_purge_seq(Db),
+        UUIDsIdsRevs = if (LastPSeq == CurPSeq) -> []; true ->
+            FoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+                [{UUID, Id, Revs} | Acc]
+            end,
+            {ok, UUIDsIdsRevs0} = couch_db:fold_purged_docs(
+                Db, LastPSeq, FoldFun, [], []
+            ),
+            UUIDsIdsRevs0
+        end,
+        rexi:reply({ok, {UUIDsIdsRevs, DocId, CurPSeq}});
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+save_purge_checkpoint_rpc(DbName, Id, PurgeSeq, Node) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            Timestamp = couch_util:utc_string(),
+            Body = {[
+                {<<"purge_seq">>, PurgeSeq},
+                {<<"timestamp_utc">>, Timestamp},
+                {<<"verify_module">>, <<"mem3_rep">>},
+                {<<"verify_function">>, <<"mem3_sync_purge">>},
+                {<<"verify_options">>, {[{<<"node">>, Node}]}},
+                {<<"type">>, <<"internal_replication">>}
+            ]},
+            Doc = #doc{id = Id, body = Body},
+            rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    {ok, Body};
+                Else ->
+                    {error, Else}
+            catch
+                Exception ->
+                    Exception;
+                error:Reason ->
+                    {error, Reason}
+            end);
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
 %% @doc Return the sequence where two files with the same UUID diverged.
 compare_epochs(SourceEpochs, TargetEpochs) ->
     compare_rev_epochs(


[02/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Merge remote branch 'cloudant:3102-fix-config_subscription'

This closes #25

Signed-off-by: ILYA Khlopotov <ii...@ca.ibm.com>


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/252467cb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/252467cb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/252467cb

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 252467cb4a27637090b5f9006483f5b7ab551699
Parents: 15615b2 432264a
Author: ILYA Khlopotov <ii...@ca.ibm.com>
Authored: Tue Aug 23 14:59:49 2016 -0700
Committer: ILYA Khlopotov <ii...@ca.ibm.com>
Committed: Tue Aug 23 14:59:49 2016 -0700

----------------------------------------------------------------------
 src/mem3_shards.erl              |  16 ++--
 src/mem3_sync_event_listener.erl | 173 +++++++++++++++++++++++++---------
 2 files changed, 138 insertions(+), 51 deletions(-)
----------------------------------------------------------------------



[04/10] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to e4e8928

Posted by da...@apache.org.
Merge remote branch 'cloudant:79066-port-chunkified-replicate_batch'

This closes #26

Signed-off-by: Eric Avdey <ei...@eiri.ca>


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/c3c54291
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/c3c54291
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/c3c54291

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: c3c5429180de14a2b139f7741c934143ef73988c
Parents: 252467c c4da61c
Author: Eric Avdey <ei...@eiri.ca>
Authored: Thu Nov 24 14:00:17 2016 -0400
Committer: Eric Avdey <ei...@eiri.ca>
Committed: Thu Nov 24 14:00:17 2016 -0400

----------------------------------------------------------------------
 src/mem3_rep.erl | 32 ++++++++++++++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------