You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ei...@apache.org on 2020/04/22 15:47:16 UTC

[couchdb] branch aegis_expiring_cache created (now d529bc5)

This is an automated email from the ASF dual-hosted git repository.

eiri pushed a change to branch aegis_expiring_cache
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at d529bc5  Convert key cache to be LRU

This branch includes the following new commits:

     new d529bc5  Convert key cache to be LRU

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Convert key cache to be LRU

Posted by ei...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eiri pushed a commit to branch aegis_expiring_cache
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d529bc5015c59485ecef56ea95d7c84fb61d3bde
Author: Eric Avdey <ei...@eiri.ca>
AuthorDate: Wed Apr 22 12:46:45 2020 -0300

    Convert key cache to be LRU
---
 src/aegis/src/aegis_server.erl       | 129 +++++++++++++++++++++++++++++++----
 src/aegis/test/aegis_server_test.erl | 108 ++++++++++++++++++++++++++++-
 2 files changed, 223 insertions(+), 14 deletions(-)

diff --git a/src/aegis/src/aegis_server.erl b/src/aegis/src/aegis_server.erl
index f4973c2..7a5a33f 100644
--- a/src/aegis/src/aegis_server.erl
+++ b/src/aegis/src/aegis_server.erl
@@ -46,12 +46,20 @@
     do_decrypt/5
 ]).
 
+%% tmp for test, move to util module
+-export([
+    now_sec/0
+]).
+
 
 -define(INIT_TIMEOUT, 60000).
 -define(TIMEOUT, 10000).
+-define(DEFAULT_CACHE_LIMIT, 100000).
+-define(DEFAULT_CACHE_MAX_AGE_SEC, 1800).
+-define(LAST_ACCESSED_QUIESCENCE_SEC, 10).
 
 
--record(entry, {id, key}).
+-record(entry, {id, key, counter, last_accessed, expires_at}).
 
 
 start_link() ->
@@ -79,9 +87,13 @@ decrypt(#{} = Db, Key, Value) when is_binary(Key), is_binary(Value) ->
 init([]) ->
     process_flag(sensitive, true),
     Cache = ets:new(?MODULE, [set, private, {keypos, #entry.id}]),
+    ByAccess = ets:new(?MODULE,
+        [ordered_set, private, {keypos, #entry.counter}]),
 
     St = #{
         cache => Cache,
+        by_access => ByAccess,
+        counter => 0,
         openers => dict:new(),
         waiters => dict:new(),
         unwrappers => dict:new()
@@ -129,6 +141,10 @@ handle_call(_Msg, _From, St) ->
     {noreply, St}.
 
 
+handle_cast({accessed, UUID}, St) ->
+    NewCounter = bump_last_accessed(St, UUID),
+    {noreply, St#{counter := NewCounter}};
+
 handle_cast(_Msg, St) ->
     {noreply, St}.
 
@@ -140,7 +156,6 @@ handle_info({'DOWN', Ref, _, _Pid, false}, #{openers := Openers} = St) ->
 
 handle_info({'DOWN', Ref, _, _Pid, {ok, DbKey, AegisConfig}}, St) ->
     #{
-        cache := Cache,
         openers := Openers,
         waiters := Waiters,
         unwrappers := Unwrappers
@@ -148,12 +163,13 @@ handle_info({'DOWN', Ref, _, _Pid, {ok, DbKey, AegisConfig}}, St) ->
 
     case dict:take(Ref, Openers) of
         {{UUID, From}, Openers1} ->
-            ok = insert(Cache, UUID, DbKey),
+            NewCounter = insert(St, UUID, DbKey),
             gen_server:reply(From, {ok, AegisConfig}),
-            {noreply, St#{openers := Openers1}, ?TIMEOUT};
+            NewSt = St#{openers := Openers1, counter := NewCounter},
+            {noreply, NewSt, ?TIMEOUT};
         error ->
             {UUID, Unwrappers1} = dict:take(Ref, Unwrappers),
-            ok = insert(Cache, UUID, DbKey),
+            NewCounter = insert(St, UUID, DbKey),
             Unwrappers2 = dict:erase(UUID, Unwrappers1),
 
             {WaitList, Waiters1} = dict:take(UUID, Waiters),
@@ -165,7 +181,11 @@ handle_info({'DOWN', Ref, _, _Pid, {ok, DbKey, AegisConfig}}, St) ->
                 } = Waiter,
                 erlang:spawn(?MODULE, Action, [From, DbKey | Args])
             end, WaitList),
-            NewSt = St#{waiters := Waiters1, unwrappers := Unwrappers2},
+            NewSt = St#{
+                waiters := Waiters1,
+                unwrappers := Unwrappers2,
+                counter := NewCounter
+            },
             {noreply, NewSt, ?TIMEOUT}
     end;
 
@@ -286,11 +306,10 @@ do_decrypt(From, DbKey, #{uuid := UUID}, Key, Value) ->
 
 maybe_spawn_worker(St, From, Action, #{uuid := UUID} = Db, Key, Value) ->
     #{
-        cache := Cache,
         waiters := Waiters
     } = St,
 
-    case lookup(Cache, UUID) of
+    case lookup(St, UUID) of
         {ok, DbKey} ->
             erlang:spawn(?MODULE, Action, [From, DbKey, Db, Key, Value]),
             St;
@@ -324,16 +343,100 @@ maybe_spawn_unwrapper(St, #{uuid := UUID} = Db) ->
 
 %% cache functions
 
-insert(Cache, UUID, DbKey) ->
-    Entry = #entry{id = UUID, key = DbKey},
+insert(St, UUID, DbKey) ->
+    #{
+        cache := Cache,
+        by_access := ByAccess,
+        counter := Counter
+    } = St,
+
+    Entry = #entry{
+        id = UUID,
+        key = DbKey,
+        counter = Counter,
+        last_accessed = ?MODULE:now_sec(),
+        expires_at = ?MODULE:now_sec() + max_age()
+    },
+
     true = ets:insert(Cache, Entry),
-    ok.
+    true = ets:insert_new(ByAccess, Entry),
+
+    maybe_evict_old_entries(St),
 
+    Counter + 1.
 
-lookup(Cache, UUID) ->
+
+lookup(#{cache := Cache}, UUID) ->
     case ets:lookup(Cache, UUID) of
-        [#entry{id = UUID, key = DbKey}] ->
+        [#entry{id = UUID, key = DbKey} = Entry] ->
+            maybe_bump_last_accessed(Entry),
             {ok, DbKey};
         [] ->
             {error, not_found}
     end.
+
+
+maybe_bump_last_accessed(#entry{last_accessed = LastAccessed} = Entry) ->
+    case ?MODULE:now_sec() > LastAccessed + ?LAST_ACCESSED_QUIESCENCE_SEC of
+        true ->
+            gen_server:cast(?MODULE, {accessed, Entry#entry.id});
+        false ->
+            ok
+    end.
+
+
+bump_last_accessed(St, UUID) ->
+    #{
+        cache := Cache,
+        by_access := ByAccess,
+        counter := Counter
+    } = St,
+
+    [#entry{counter = OldCounter} = Entry0] = ets:lookup(Cache, UUID),
+
+    Entry = Entry0#entry{
+        last_accessed = now_sec(),
+        counter = Counter
+    },
+
+    true = ets:insert(Cache, Entry),
+    true = ets:insert_new(ByAccess, Entry),
+
+    ets:delete(ByAccess, OldCounter),
+
+    Counter + 1.
+
+
+maybe_evict_old_entries(#{cache := Cache} = St) ->
+    CacheLimit = cache_limit(),
+    CacheSize = ets:info(Cache, size),
+    evict_old_entries(St, CacheSize - CacheLimit).
+
+
+evict_old_entries(St, N) when N > 0 ->
+    #{
+        cache := Cache,
+        by_access := ByAccess
+    } = St,
+
+    OldestKey = ets:first(ByAccess),
+    [#entry{id = UUID}] = ets:lookup(ByAccess, OldestKey),
+    true = ets:delete(Cache, UUID),
+    true = ets:delete(ByAccess, OldestKey),
+    evict_old_entries(St, N - 1);
+
+evict_old_entries(_St, _) ->
+    ok.
+
+
+now_sec() ->
+    {Mega, Sec, _} = os:timestamp(),
+    Mega * 1000000 + Sec.
+
+
+max_age() ->
+    config:get_integer("aegis", "cache_max_age_sec",?DEFAULT_CACHE_MAX_AGE_SEC).
+
+
+cache_limit() ->
+    config:get_integer("aegis", "cache_limit", ?DEFAULT_CACHE_LIMIT).
diff --git a/src/aegis/test/aegis_server_test.erl b/src/aegis/test/aegis_server_test.erl
index b9b4588..6ef4373 100644
--- a/src/aegis/test/aegis_server_test.erl
+++ b/src/aegis/test/aegis_server_test.erl
@@ -15,7 +15,6 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
 
--define(SERVER, aegis_server).
 -define(DB, #{aegis => <<0:320>>, uuid => <<0:64>>}).
 -define(VALUE, <<0:8192>>).
 -define(ENCRYPTED, <<1:8, 0:320, 0:4096>>).
@@ -215,3 +214,110 @@ test_disabled_decrypt() ->
 
     ?assertEqual(0, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
     ?assertEqual(0, meck:num_calls(aegis_server, do_decrypt, 5)).
+
+
+
+lru_cache_with_expiration_test_() ->
+    {
+        foreach,
+        fun() ->
+            Ctx = setup(),
+            meck:new([config], [passthrough]),
+            ok = meck:expect(config, get_integer, fun
+                ("aegis", "cache_limit", _) -> 5;
+                (_, _, Default) -> Default
+            end),
+            ok = meck:expect(aegis_server, now_sec, fun() ->
+                get(time) == undefined andalso put(time, 0),
+                Now = get(time),
+                put(time, Now + 20),
+                Now
+            end),
+            Ctx
+        end,
+        fun teardown/1,
+        [
+            {"counter moves forward on access bump",
+            {timeout, ?TIMEOUT, fun test_advance_counter/0}},
+            {"oldest entries evicted",
+            {timeout, ?TIMEOUT, fun test_evict_old_entries/0}},
+            {"access bump preserves entries",
+            {timeout, ?TIMEOUT, fun test_bump_accessed/0}}
+        ]
+    }.
+
+
+test_advance_counter() ->
+    ets:new(?MODULE, [named_table, set, public]),
+
+    ok = meck:expect(aegis_server, handle_cast, fun({accessed, _} = Msg, St) ->
+        #{counter := Counter} = St,
+        get(counter) == undefined andalso put(counter, 0),
+        OldCounter = get(counter),
+        put(counter, Counter),
+        ets:insert(?MODULE, {counter, {OldCounter, Counter}}),
+        meck:passthrough([Msg, St])
+    end),
+
+    lists:foreach(fun(I) ->
+        Db = ?DB#{uuid => <<I:64>>},
+        aegis_server:encrypt(Db, <<I:64>>, ?VALUE),
+        aegis_server:decrypt(Db, <<I:64>>, ?VALUE),
+        [{counter, {OldCounter, Counter}}] = ets:lookup(?MODULE, counter),
+        ?assert(Counter > OldCounter)
+    end, lists:seq(1, 10)),
+
+    ets:delete(?MODULE).
+
+
+test_evict_old_entries() ->
+    ?assertEqual(0, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% overflow cache
+    lists:foreach(fun(I) ->
+        Db = ?DB#{uuid => <<I:64>>},
+        aegis_server:encrypt(Db, <<I:64>>, ?VALUE)
+    end, lists:seq(1, 10)),
+
+    ?assertEqual(10, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% newest still in cache
+    lists:foreach(fun(I) ->
+        Db = ?DB#{uuid => <<I:64>>},
+        aegis_server:decrypt(Db, <<I:64>>, ?VALUE)
+    end, lists:seq(6, 10)),
+
+    ?assertEqual(10, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% oldest been eviced and needed re-fetch
+    lists:foreach(fun(I) ->
+        Db = ?DB#{uuid => <<I:64>>},
+        aegis_server:decrypt(Db, <<I:64>>, ?VALUE)
+    end, lists:seq(1, 5)),
+
+    ?assertEqual(15, meck:num_calls(aegis_key_manager, unwrap_key, 2)).
+
+
+test_bump_accessed() ->
+    ?assertEqual(0, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% fill the cache
+    lists:foreach(fun(I) ->
+        Db = ?DB#{uuid => <<I:64>>},
+        aegis_server:encrypt(Db, <<I:64>>, ?VALUE)
+    end, lists:seq(1, 5)),
+
+    ?assertEqual(5, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% bump oldest and insert new rec
+    aegis_server:decrypt(?DB#{uuid => <<1:64>>}, <<1:64>>, ?VALUE),
+    aegis_server:encrypt(?DB#{uuid => <<6:64>>}, <<6:64>>, ?VALUE),
+    ?assertEqual(6, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% confirm former oldest still in cache
+    aegis_server:decrypt(?DB#{uuid => <<1:64>>}, <<1:64>>, ?VALUE),
+    ?assertEqual(6, meck:num_calls(aegis_key_manager, unwrap_key, 2)),
+
+    %% confirm second oldest been evicted by new insert
+    aegis_server:decrypt(?DB#{uuid => <<2:64>>}, <<2:64>>, ?VALUE),
+    ?assertEqual(7, meck:num_calls(aegis_key_manager, unwrap_key, 2)).