You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 22:00:42 UTC

[couchdb] 02/03: Rework cache and IOQ2 pid per couch_file

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3dc261da0e0ab0c1428be01e0520aff777f5f5bd
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:09:28 2021 -0700

    Rework cache and IOQ2 pid per couch_file
---
 src/couch/src/couch_bt_engine.erl |  11 ++--
 src/couch/src/couch_db.erl        |   2 +
 src/couch/src/couch_file.erl      | 105 +++++++++++++++++++++-----------------
 3 files changed, 65 insertions(+), 53 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 60f79c3..dc5bbfc 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -117,6 +117,7 @@
 -include_lib("kernel/include/file.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_bt_engine.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 exists(FilePath) ->
@@ -192,8 +193,8 @@ handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) ->
     {stop, normal, St#st{fd=undefined, fd_monitor=closed}}.
 
 
-incref(St) ->
-    {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+incref(#st{fd=#ioq_file{fd=Fd}}=St) ->
+    {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}.
 
 
 decref(St) ->
@@ -201,8 +202,8 @@ decref(St) ->
     ok.
 
 
-monitored_by(St) ->
-    case erlang:process_info(St#st.fd, monitored_by) of
+monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) ->
+    case erlang:process_info(Fd, monitored_by) of
         {monitored_by, Pids} ->
             lists:filter(fun is_pid/1, Pids);
         _ ->
@@ -920,7 +921,7 @@ init_state(FilePath, Fd, Header0, Options) ->
     St = #st{
         filepath = FilePath,
         fd = Fd,
-        fd_monitor = erlang:monitor(process, Fd),
+        fd_monitor = erlang:monitor(process, Fd#ioq_file.fd),
         header = Header,
         needs_commit = false,
         id_tree = IdTree,
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8837101..aec0f54 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -757,6 +757,8 @@ set_security(_, _) ->
     throw(bad_request).
 
 set_user_ctx(#db{} = Db, UserCtx) ->
+    %% TODO:
+    %% couch_db_engine:set_user_ctx(Db, UserCtx),
     {ok, Db#db{user_ctx = UserCtx}}.
 
 validate_security_object(SecProps) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b29c54d..675e6d6 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -15,6 +15,7 @@
 -vsn(2).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 -define(INITIAL_WAIT, 60000).
@@ -33,12 +34,13 @@
     is_sys,
     eof = 0,
     db_monitor,
-    pread_limit = 0
+    pread_limit = 0,
+    tab
 }).
 
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_iolist/2, pread_binary/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -69,7 +71,9 @@ open(Filepath, Options) ->
     case gen_server:start_link(couch_file,
             {Filepath, Options, self(), Ref = make_ref()}, []) of
     {ok, Fd} ->
-        {ok, Fd};
+        {ok, IOQPid} = ioq_server2:start_link({by_shard, Filepath, Fd}),
+        Tab = gen_server:call(Fd, get_cache_ref),
+        {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}};
     ignore ->
         % get the error
         receive
@@ -94,7 +98,7 @@ open(Filepath, Options) ->
     end.
 
 
-set_db_pid(Fd, Pid) ->
+set_db_pid(#ioq_file{fd=Fd}, Pid) ->
     gen_server:call(Fd, {set_db_pid, Pid}).
 
 
@@ -156,36 +160,20 @@ assemble_file_chunk(Bin, Md5) ->
 %%----------------------------------------------------------------------
 
 pread_term(Fd, Pos) ->
-    UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
-    pread_term(Fd, Pos, UseCache).
-
-
-pread_term(Fd, Pos, true) ->
-    case erlang:get(couch_file_hash) of
-        undefined ->
-            pread_term(Fd, Pos, false);
-        _ ->
-            load_from_cache(Fd, Pos)
-    end;
-pread_term(Fd, Pos, false) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, couch_compress:decompress(Bin)}.
 
 
 %% TODO: add purpose docs
-load_from_cache(Fd, Pos) ->
-    Hash = erlang:get(couch_file_hash),
-    case ets:lookup(Hash, Pos) of
-        [{Pos, {ok, Res}}] ->
-            {ok, Res};
+load_from_cache(#ioq_file{tab=undefined}, _Pos) ->
+    missing;
+load_from_cache(#ioq_file{tab=Tab}, Pos) ->
+    case ets:lookup(Tab, Pos) of
+        [{Pos, Res}] ->
+            %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]),
+            Res;
         [] ->
-            %% TODO: don't repeat this, but avoid circular recursion
-            %% pread_term(Fd, Pos, false),
-            {ok, Bin} = pread_binary(Fd, Pos),
-            Val = {ok, couch_compress:decompress(Bin)},
-            %% TODO: should probably be inserted directly by the gen_server
-            gen_server:cast(Fd, {cache, Pos, Val}),
-            Val
+            missing
     end.
 
 
@@ -202,11 +190,16 @@ pread_binary(Fd, Pos) ->
 
 
 pread_iolist(Fd, Pos) ->
-    case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+    case load_from_cache(Fd, Pos) of
         {ok, IoList, Md5} ->
             {ok, verify_md5(Fd, Pos, IoList, Md5)};
-        Error ->
-            Error
+        missing ->
+            case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+                {ok, IoList, Md5} ->
+                    {ok, verify_md5(Fd, Pos, IoList, Md5)};
+                Error ->
+                    Error
+            end
     end.
 
 
@@ -259,7 +252,7 @@ append_binaries(Fd, Bins) ->
 %%----------------------------------------------------------------------
 
 % length in bytes
-bytes(Fd) ->
+bytes(#ioq_file{fd=Fd}) ->
     gen_server:call(Fd, bytes, infinity).
 
 %%----------------------------------------------------------------------
@@ -268,7 +261,7 @@ bytes(Fd) ->
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
-truncate(Fd, Pos) ->
+truncate(#ioq_file{fd=Fd}, Pos) ->
     gen_server:call(Fd, {truncate, Pos}, infinity).
 
 %%----------------------------------------------------------------------
@@ -293,7 +286,7 @@ sync(Filepath) when is_list(Filepath) ->
         {error, Error} ->
             erlang:error(Error)
     end;
-sync(Fd) ->
+sync(#ioq_file{fd=Fd}) ->
     case gen_server:call(Fd, sync, infinity) of
         ok ->
             ok;
@@ -305,8 +298,10 @@ sync(Fd) ->
 %% Purpose: Close the file.
 %% Returns: ok
 %%----------------------------------------------------------------------
-close(Fd) ->
-    gen_server:call(Fd, close, infinity).
+close(#ioq_file{fd=Fd, ioq=IOP}) ->
+    Res = gen_server:call(Fd, close, infinity),
+    gen_server:call(IOP, close, infinity),
+    Res.
 
 
 delete(RootDir, Filepath) ->
@@ -423,7 +418,7 @@ init_status_error(ReturnPid, Ref, Error) ->
     ignore.
 
 
-last_read(Fd) when is_pid(Fd) ->
+last_read(#ioq_file{fd=Fd}) when is_pid(Fd) ->
     Now = os:timestamp(),
     couch_util:process_dict_get(Fd, read_timestamp, Now).
 
@@ -435,9 +430,13 @@ init({Filepath, Options, ReturnPid, Ref}) ->
     Limit = get_pread_limit(),
     IsSys = lists:member(sys_db, Options),
     update_read_timestamp(),
-    Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
-    erlang:put(couch_file_cache, Tab),
-    ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
+    ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true),
+    Tab = case ShouldCache of
+        true ->
+            ets:new(?MODULE, [set, protected, {read_concurrency, true}]);
+        false ->
+            undefined
+    end,
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -458,7 +457,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                     ok = file:sync(Fd),
                     maybe_track_open_os_files(Options),
                     erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                    {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}};
+                    {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
                 false ->
                     ok = file:close(Fd),
                     init_status_error(ReturnPid, Ref, {error, eexist})
@@ -466,7 +465,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
             false ->
                 maybe_track_open_os_files(Options),
                 erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}}
+                {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}
             end;
         Error ->
             init_status_error(ReturnPid, Ref, Error)
@@ -483,7 +482,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                      maybe_track_open_os_files(Options),
                      {ok, Eof} = file:position(Fd, eof),
                      erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                     {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit}};
+                     {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
                  Error ->
                      init_status_error(ReturnPid, Ref, Error)
             end;
@@ -522,15 +521,17 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
 handle_call({pread_iolist, Pos}, _From, File) ->
     update_read_timestamp(),
     {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
-    case iolist_to_binary(LenIolist) of
+    Resp = case iolist_to_binary(LenIolist) of
     <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
         {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
         {Md5, IoList} = extract_md5(Md5AndIoList),
-        {reply, {ok, IoList, Md5}, File};
+        {ok, IoList, Md5};
     <<0:1/integer,Len:31/integer>> ->
         {Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
-        {reply, {ok, Iolist, <<>>}, File}
-    end;
+        {ok, Iolist, <<>>}
+    end,
+    maybe_cache(File#file.tab, {Pos, Resp}),
+    {reply, Resp, File};
 
 handle_call({pread_iolists, PosL}, _From, File) ->
     update_read_timestamp(),
@@ -629,7 +630,10 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     end;
 
 handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
-    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call(get_cache_ref, _From, #file{tab=Tab} = File) ->
+    {reply, Tab, File}.
 
 
 handle_cast({cache, Key, Val}, Fd) ->
@@ -909,6 +913,11 @@ reset_eof(#file{} = File) ->
     {ok, Eof} = file:position(File#file.fd, eof),
     File#file{eof = Eof}.
 
+maybe_cache(undefined, _Obj) ->
+    ok;
+maybe_cache(Tab, Obj) ->
+    ets:insert(Tab, Obj).
+
 -ifdef(TEST).
 -include_lib("couch/include/couch_eunit.hrl").