You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/25 19:50:39 UTC

[couchdb] branch chewbranca-ioq-experiments-rebase created (now 803f71d)

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a change to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 803f71d  Fix some tests

This branch includes the following new commits:

     new b5bc429  Add couch_file cache
     new 05f6b06  Rework cache and IOQ2 pid per couch_file
     new 803f71d  Fix some tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[couchdb] 03/03: Fix some tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 803f71d2910a19a5fa40f21222edcd60dbadd0f8
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Aug 23 18:19:50 2021 -0700

    Fix some tests
---
 src/couch/src/couch_bt_engine_compactor.erl     |  5 +-
 src/couch/src/couch_bt_engine_stream.erl        |  8 +--
 src/couch/src/couch_file.erl                    |  4 +-
 src/couch/test/eunit/couch_btree_tests.erl      | 11 +++++
 src/couch/test/eunit/couch_file_tests.erl       | 66 +++++++++++++++++++++----
 src/couch_mrview/src/couch_mrview_compactor.erl | 12 ++---
 src/couch_mrview/src/couch_mrview_util.erl      |  4 +-
 7 files changed, 86 insertions(+), 24 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 3e356e2..297e70c 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -145,8 +145,9 @@ open_compaction_files(DbName, OldSt, Options) ->
                 retry = nil
             }
     end,
-    unlink(DataFd),
-    erlang:monitor(process, MetaFd),
+    unlink(ioq:fd_pid(DataFd)),
+    unlink(ioq:ioq_pid(DataFd)),
+    erlang:monitor(process, ioq:fd_pid(MetaFd)),
     {ok, CompSt}.
 
 
diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl
index 431894a..e57e9a0 100644
--- a/src/couch/src/couch_bt_engine_stream.erl
+++ b/src/couch/src/couch_bt_engine_stream.erl
@@ -20,6 +20,8 @@
     to_disk_term/1
 ]).
 
+-include_lib("ioq/include/ioq.hrl").
+
 
 foldl({_Fd, []}, _Fun, Acc) ->
     Acc;
@@ -56,9 +58,9 @@ seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) ->
     end.
 
 
-write({Fd, Written}, Data) when is_pid(Fd) ->
-    {ok, Pos, _} = couch_file:append_binary(Fd, Data),
-    {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}.
+write({#ioq_file{fd=Fd}=IOF, Written}, Data) when is_pid(Fd) ->
+    {ok, Pos, _} = couch_file:append_binary(IOF, Data),
+    {ok, {IOF, [{Pos, iolist_size(Data)} | Written]}}.
 
 
 finalize({Fd, Written}) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 675e6d6..a6eeef5 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -889,7 +889,9 @@ is_idle(#file{is_sys=false}) ->
 -spec process_info(CouchFilePid :: pid()) ->
     {Fd :: pid() | tuple(), FilePath :: string()} | undefined.
 
-process_info(Pid) ->
+process_info(Pid) when is_pid(Pid) ->
+    couch_util:process_dict_get(Pid, couch_file_fd);
+process_info(#ioq_file{fd=Pid}) ->
     couch_util:process_dict_get(Pid, couch_file_fd).
 
 update_read_timestamp() ->
diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl
index c9b791d..9e7d3d0 100644
--- a/src/couch/test/eunit/couch_btree_tests.erl
+++ b/src/couch/test/eunit/couch_btree_tests.erl
@@ -14,6 +14,7 @@
 
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 -define(ROWS, 1000).
 -define(TIMEOUT, 60). % seconds
@@ -42,6 +43,8 @@ setup_red() ->
 setup_red(_) ->
     setup_red().
 
+teardown(#ioq_file{}=Fd) ->
+    ok = couch_file:close(Fd);
 teardown(Fd) when is_pid(Fd) ->
     ok = couch_file:close(Fd);
 teardown({Fd, _}) ->
@@ -74,6 +77,14 @@ red_test_funs() ->
 
 
 btree_open_test_() ->
+    {
+        setup,
+        fun() -> test_util:start(?MODULE, [ioq]) end,
+        fun test_util:stop/1,
+        fun btree_should_open/0
+    }.
+
+btree_should_open() ->
     {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
     {ok, Btree} = couch_btree:open(nil, Fd, [{compression, none}]),
     {
diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl
index 606f4bb..99bb8c0 100644
--- a/src/couch/test/eunit/couch_file_tests.erl
+++ b/src/couch/test/eunit/couch_file_tests.erl
@@ -13,6 +13,7 @@
 -module(couch_file_tests).
 
 -include_lib("couch/include/couch_eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 -define(BLOCK_SIZE, 4096).
 -define(setup(F), {setup, fun setup/0, fun teardown/1, F}).
@@ -24,22 +25,52 @@ setup() ->
     Fd.
 
 teardown(Fd) ->
-    case is_process_alive(Fd) of
+    case is_process_alive(ioq:fd_pid(Fd)) of
         true -> ok = couch_file:close(Fd);
         false -> ok
     end.
 
+mock_server() ->
+    %%meck:new(config),
+    meck:expect(config, get, fun(Group) ->
+        []
+    end),
+    meck:expect(config, get, fun(_,_) ->
+        undefined
+    end),
+    meck:expect(config, get, fun("ioq2", _, Default) ->
+        Default
+    end),
+    meck:expect(config, get, fun(_, _, Default) ->
+        Default
+    end),
+    %%  meck:expect(config, get, fun(_, _, _) ->
+    %%      undefined
+    %%  end),
+    meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+        Default
+    end),
+    meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+        Default
+    end).
+
+
+unmock_server(_) ->
+    true = meck:validate(config),
+    ok = meck:unload(config).
+
 open_close_test_() ->
     {
         "Test for proper file open and close",
         {
             setup,
+            %%fun() -> mock_server(), test_util:start(?MODULE, [ioq]) end, fun(A) -> unmock_server(A), test_util:stop(A) end,
             fun() -> test_util:start(?MODULE, [ioq]) end, fun test_util:stop/1,
             [
                 should_return_enoent_if_missed(),
                 should_ignore_invalid_flags_with_open(),
                 ?setup(fun should_return_pid_on_file_open/1),
-                should_close_file_properly(),
+                ?setup(fun should_close_file_properly/0),
                 ?setup(fun should_create_empty_new_files/1)
             ]
         }
@@ -53,7 +84,7 @@ should_ignore_invalid_flags_with_open() ->
                   couch_file:open(?tempfile(), [create, invalid_option])).
 
 should_return_pid_on_file_open(Fd) ->
-    ?_assert(is_pid(Fd)).
+    ?_assert(is_pid(ioq:fd_pid(Fd))).
 
 should_close_file_properly() ->
     {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
@@ -138,10 +169,15 @@ should_not_read_beyond_eof(Fd) ->
     {ok, Io} = file:open(Filepath, [read, write, binary]),
     ok = file:pwrite(Io, Pos, <<0:1/integer, DoubleBin:31/integer>>),
     file:close(Io),
-    unlink(Fd),
-    ExpectedError = {badmatch, {'EXIT', {bad_return_value,
-        {read_beyond_eof, Filepath}}}},
-    ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+    unlink(ioq:fd_pid(Fd)),
+    unlink(ioq:ioq_pid(Fd)),
+    %% ExpectedError = {badmatch, {'EXIT', {bad_return_value,
+    %%     {read_beyond_eof, Filepath}}}},
+    %%ExpectedError = {exit, {{bad_return_value, {read_beyond_eof Filepath,}}, _}, _},
+
+    ?_assertExit(
+        {{bad_return_value, {read_beyond_eof, Filepath}}, _},
+        couch_file:pread_binary(Fd, Pos)).
 
 should_truncate(Fd) ->
     {ok, 0, _} = couch_file:append_term(Fd, foo),
@@ -179,10 +215,19 @@ should_not_read_more_than_pread_limit(Fd) ->
     {_, Filepath} = couch_file:process_info(Fd),
     BigBin = list_to_binary(lists:duplicate(100000, 0)),
     {ok, Pos, _Size} = couch_file:append_binary(Fd, BigBin),
-    unlink(Fd),
+    unlink(ioq:fd_pid(Fd)),
+    unlink(ioq:ioq_pid(Fd)),
     ExpectedError = {badmatch, {'EXIT', {bad_return_value,
         {exceed_pread_limit, Filepath, 50000}}}},
-    ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+    ?debugFmt("EXPECTED ERROR IS: ~p~n", [ExpectedError]),
+    %%?_assert(couch_file:pread_binary(Fd, Pos)).
+    %%try couch_file:pread_binary(Fd, Pos)
+    %%catch E:R:S -> ?debugFmt("GOT ERROR: ~p || ~p~n~p~n", [E,R,S])
+    %%end,
+    %%?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+    ?_assertExit(
+        {{bad_return_value, {exceed_pread_limit, Filepath, 50000}}, _},
+        couch_file:pread_binary(Fd, Pos)).
 
 
 header_test_() ->
@@ -537,7 +582,8 @@ fsync_error_test_() ->
 
 
 fsync_raises_errors() ->
-    Fd = spawn(fun() -> fake_fsync_fd() end),
+    FdPid = spawn(fun() -> fake_fsync_fd() end),
+    Fd = #ioq_file{fd=FdPid},
     ?assertError({fsync_error, eio}, couch_file:sync(Fd)).
 
 
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index d42edc0..82d0629 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -115,7 +115,7 @@ compact(State) ->
         compact_view(View, EmptyView, BufferSize, Acc)
     end, FinalAcc2, lists:zip(Views, EmptyViews)),
 
-    unlink(EmptyState#mrst.fd),
+    unlink(ioq:fd_pid(EmptyState#mrst.fd)),
     {ok, EmptyState#mrst{
         id_btree=NewIdBtree,
         views=NewViews,
@@ -132,7 +132,7 @@ recompact(#mrst{db_name=DbName, idx_name=IdxName}, 0) ->
 
 recompact(State, RetryCount) ->
     Self = self(),
-    link(State#mrst.fd),
+    link(ioq:fd_pid(State#mrst.fd)),
     {Pid, Ref} = erlang:spawn_monitor(fun() ->
         couch_index_updater:update(Self, couch_mrview_index, State)
     end),
@@ -144,10 +144,10 @@ recompact_loop(Pid, Ref, State, RetryCount) ->
             % We've made progress so reset RetryCount
             recompact_loop(Pid, Ref, State2, recompact_retry_count());
         {'DOWN', Ref, _, _, {updated, Pid, State2}} ->
-            unlink(State#mrst.fd),
+            unlink(ioq:fd_pid(State#mrst.fd)),
             {ok, State2};
         {'DOWN', Ref, _, _, Reason} ->
-            unlink(State#mrst.fd),
+            unlink(ioq:fd_pid(State#mrst.fd)),
             couch_log:warning("Error during recompaction: ~r", [Reason]),
             recompact(State, RetryCount - 1)
     end.
@@ -218,7 +218,7 @@ swap_compacted(OldState, NewState) ->
         fd=NewFd
     } = NewState,
 
-    link(NewState#mrst.fd),
+    link(ioq:fd_pid(NewState#mrst.fd)),
     Ref = erlang:monitor(process, NewState#mrst.fd),
 
     RootDir = couch_index_util:root_dir(),
@@ -232,7 +232,7 @@ swap_compacted(OldState, NewState) ->
     ok = couch_file:delete(RootDir, IndexFName),
     ok = file:rename(CompactFName, IndexFName),
 
-    unlink(OldState#mrst.fd),
+    unlink(ioq:fd_pid(OldState#mrst.fd)),
     erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
 
     {ok, NewState#mrst{fd_monitor=Ref}}.
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index d318a3f..6486c93 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -81,7 +81,7 @@ get_signature_from_filename(FileName) ->
 get_view(Db, DDoc, ViewName, Args0) ->
     case get_view_index_state(Db, DDoc, ViewName, Args0) of
         {ok, State, Args2} ->
-            Ref = erlang:monitor(process, State#mrst.fd),
+            Ref = erlang:monitor(process, ioq:fd_pid(State#mrst.fd)),
             #mrst{language=Lang, views=Views} = State,
             {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
             check_range(Args3, view_cmp(View)),
@@ -298,7 +298,7 @@ init_state(Db, Fd, State, Header) ->
 
     State#mrst{
         fd=Fd,
-        fd_monitor=erlang:monitor(process, Fd),
+        fd_monitor=erlang:monitor(process, ioq:fd_pid(Fd)),
         update_seq=Seq,
         purge_seq=PurgeSeq,
         id_btree=IdBtree,

[couchdb] 02/03: Rework cache and IOQ2 pid per couch_file

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 05f6b0672985c469a75cb54a887b2dab94d3a82d
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:09:28 2021 -0700

    Rework cache and IOQ2 pid per couch_file
---
 src/couch/src/couch_bt_engine.erl |  11 ++--
 src/couch/src/couch_db.erl        |   2 +
 src/couch/src/couch_file.erl      | 105 +++++++++++++++++++++-----------------
 3 files changed, 65 insertions(+), 53 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 60f79c3..dc5bbfc 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -117,6 +117,7 @@
 -include_lib("kernel/include/file.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_bt_engine.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 exists(FilePath) ->
@@ -192,8 +193,8 @@ handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) ->
     {stop, normal, St#st{fd=undefined, fd_monitor=closed}}.
 
 
-incref(St) ->
-    {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+incref(#st{fd=#ioq_file{fd=Fd}}=St) ->
+    {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}.
 
 
 decref(St) ->
@@ -201,8 +202,8 @@ decref(St) ->
     ok.
 
 
-monitored_by(St) ->
-    case erlang:process_info(St#st.fd, monitored_by) of
+monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) ->
+    case erlang:process_info(Fd, monitored_by) of
         {monitored_by, Pids} ->
             lists:filter(fun is_pid/1, Pids);
         _ ->
@@ -920,7 +921,7 @@ init_state(FilePath, Fd, Header0, Options) ->
     St = #st{
         filepath = FilePath,
         fd = Fd,
-        fd_monitor = erlang:monitor(process, Fd),
+        fd_monitor = erlang:monitor(process, Fd#ioq_file.fd),
         header = Header,
         needs_commit = false,
         id_tree = IdTree,
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8837101..aec0f54 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -757,6 +757,8 @@ set_security(_, _) ->
     throw(bad_request).
 
 set_user_ctx(#db{} = Db, UserCtx) ->
+    %% TODO:
+    %% couch_db_engine:set_user_ctx(Db, UserCtx),
     {ok, Db#db{user_ctx = UserCtx}}.
 
 validate_security_object(SecProps) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b29c54d..675e6d6 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -15,6 +15,7 @@
 -vsn(2).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 -define(INITIAL_WAIT, 60000).
@@ -33,12 +34,13 @@
     is_sys,
     eof = 0,
     db_monitor,
-    pread_limit = 0
+    pread_limit = 0,
+    tab
 }).
 
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_iolist/2, pread_binary/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -69,7 +71,9 @@ open(Filepath, Options) ->
     case gen_server:start_link(couch_file,
             {Filepath, Options, self(), Ref = make_ref()}, []) of
     {ok, Fd} ->
-        {ok, Fd};
+        {ok, IOQPid} = ioq_server2:start_link({by_shard, Filepath, Fd}),
+        Tab = gen_server:call(Fd, get_cache_ref),
+        {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}};
     ignore ->
         % get the error
         receive
@@ -94,7 +98,7 @@ open(Filepath, Options) ->
     end.
 
 
-set_db_pid(Fd, Pid) ->
+set_db_pid(#ioq_file{fd=Fd}, Pid) ->
     gen_server:call(Fd, {set_db_pid, Pid}).
 
 
@@ -156,36 +160,20 @@ assemble_file_chunk(Bin, Md5) ->
 %%----------------------------------------------------------------------
 
 pread_term(Fd, Pos) ->
-    UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
-    pread_term(Fd, Pos, UseCache).
-
-
-pread_term(Fd, Pos, true) ->
-    case erlang:get(couch_file_hash) of
-        undefined ->
-            pread_term(Fd, Pos, false);
-        _ ->
-            load_from_cache(Fd, Pos)
-    end;
-pread_term(Fd, Pos, false) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, couch_compress:decompress(Bin)}.
 
 
 %% TODO: add purpose docs
-load_from_cache(Fd, Pos) ->
-    Hash = erlang:get(couch_file_hash),
-    case ets:lookup(Hash, Pos) of
-        [{Pos, {ok, Res}}] ->
-            {ok, Res};
+load_from_cache(#ioq_file{tab=undefined}, _Pos) ->
+    missing;
+load_from_cache(#ioq_file{tab=Tab}, Pos) ->
+    case ets:lookup(Tab, Pos) of
+        [{Pos, Res}] ->
+            %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]),
+            Res;
         [] ->
-            %% TODO: don't repeat this, but avoid circular recursion
-            %% pread_term(Fd, Pos, false),
-            {ok, Bin} = pread_binary(Fd, Pos),
-            Val = {ok, couch_compress:decompress(Bin)},
-            %% TODO: should probably be inserted directly by the gen_server
-            gen_server:cast(Fd, {cache, Pos, Val}),
-            Val
+            missing
     end.
 
 
@@ -202,11 +190,16 @@ pread_binary(Fd, Pos) ->
 
 
 pread_iolist(Fd, Pos) ->
-    case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+    case load_from_cache(Fd, Pos) of
         {ok, IoList, Md5} ->
             {ok, verify_md5(Fd, Pos, IoList, Md5)};
-        Error ->
-            Error
+        missing ->
+            case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+                {ok, IoList, Md5} ->
+                    {ok, verify_md5(Fd, Pos, IoList, Md5)};
+                Error ->
+                    Error
+            end
     end.
 
 
@@ -259,7 +252,7 @@ append_binaries(Fd, Bins) ->
 %%----------------------------------------------------------------------
 
 % length in bytes
-bytes(Fd) ->
+bytes(#ioq_file{fd=Fd}) ->
     gen_server:call(Fd, bytes, infinity).
 
 %%----------------------------------------------------------------------
@@ -268,7 +261,7 @@ bytes(Fd) ->
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
-truncate(Fd, Pos) ->
+truncate(#ioq_file{fd=Fd}, Pos) ->
     gen_server:call(Fd, {truncate, Pos}, infinity).
 
 %%----------------------------------------------------------------------
@@ -293,7 +286,7 @@ sync(Filepath) when is_list(Filepath) ->
         {error, Error} ->
             erlang:error(Error)
     end;
-sync(Fd) ->
+sync(#ioq_file{fd=Fd}) ->
     case gen_server:call(Fd, sync, infinity) of
         ok ->
             ok;
@@ -305,8 +298,10 @@ sync(Fd) ->
 %% Purpose: Close the file.
 %% Returns: ok
 %%----------------------------------------------------------------------
-close(Fd) ->
-    gen_server:call(Fd, close, infinity).
+close(#ioq_file{fd=Fd, ioq=IOP}) ->
+    Res = gen_server:call(Fd, close, infinity),
+    gen_server:call(IOP, close, infinity),
+    Res.
 
 
 delete(RootDir, Filepath) ->
@@ -423,7 +418,7 @@ init_status_error(ReturnPid, Ref, Error) ->
     ignore.
 
 
-last_read(Fd) when is_pid(Fd) ->
+last_read(#ioq_file{fd=Fd}) when is_pid(Fd) ->
     Now = os:timestamp(),
     couch_util:process_dict_get(Fd, read_timestamp, Now).
 
@@ -435,9 +430,13 @@ init({Filepath, Options, ReturnPid, Ref}) ->
     Limit = get_pread_limit(),
     IsSys = lists:member(sys_db, Options),
     update_read_timestamp(),
-    Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
-    erlang:put(couch_file_cache, Tab),
-    ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
+    ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true),
+    Tab = case ShouldCache of
+        true ->
+            ets:new(?MODULE, [set, protected, {read_concurrency, true}]);
+        false ->
+            undefined
+    end,
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -458,7 +457,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                     ok = file:sync(Fd),
                     maybe_track_open_os_files(Options),
                     erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                    {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}};
+                    {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
                 false ->
                     ok = file:close(Fd),
                     init_status_error(ReturnPid, Ref, {error, eexist})
@@ -466,7 +465,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
             false ->
                 maybe_track_open_os_files(Options),
                 erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}}
+                {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}
             end;
         Error ->
             init_status_error(ReturnPid, Ref, Error)
@@ -483,7 +482,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                      maybe_track_open_os_files(Options),
                      {ok, Eof} = file:position(Fd, eof),
                      erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                     {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit}};
+                     {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
                  Error ->
                      init_status_error(ReturnPid, Ref, Error)
             end;
@@ -522,15 +521,17 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
 handle_call({pread_iolist, Pos}, _From, File) ->
     update_read_timestamp(),
     {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
-    case iolist_to_binary(LenIolist) of
+    Resp = case iolist_to_binary(LenIolist) of
     <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
         {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
         {Md5, IoList} = extract_md5(Md5AndIoList),
-        {reply, {ok, IoList, Md5}, File};
+        {ok, IoList, Md5};
     <<0:1/integer,Len:31/integer>> ->
         {Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
-        {reply, {ok, Iolist, <<>>}, File}
-    end;
+        {ok, Iolist, <<>>}
+    end,
+    maybe_cache(File#file.tab, {Pos, Resp}),
+    {reply, Resp, File};
 
 handle_call({pread_iolists, PosL}, _From, File) ->
     update_read_timestamp(),
@@ -629,7 +630,10 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     end;
 
 handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
-    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call(get_cache_ref, _From, #file{tab=Tab} = File) ->
+    {reply, Tab, File}.
 
 
 handle_cast({cache, Key, Val}, Fd) ->
@@ -909,6 +913,11 @@ reset_eof(#file{} = File) ->
     {ok, Eof} = file:position(File#file.fd, eof),
     File#file{eof = Eof}.
 
+maybe_cache(undefined, _Obj) ->
+    ok;
+maybe_cache(Tab, Obj) ->
+    ets:insert(Tab, Obj).
+
 -ifdef(TEST).
 -include_lib("couch/include/couch_eunit.hrl").
 

[couchdb] 01/03: Add couch_file cache

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b5bc42915faea0f6424d5b7d9a99bd9c6691ffb4
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Aug 12 14:32:56 2021 -0700

    Add couch_file cache
---
 src/couch/src/couch_bt_engine.erl |  2 ++
 src/couch/src/couch_db_engine.erl |  7 +++++++
 src/couch/src/couch_file.erl      | 41 +++++++++++++++++++++++++++++++++++++--
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 48e751a..60f79c3 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -846,6 +846,8 @@ copy_props(#st{header = Header} = St, Props) ->
 
 
 open_db_file(FilePath, Options) ->
+    Hash = list_to_atom(integer_to_list(mem3_hash:crc32(FilePath))),
+    erlang:put(couch_file_hash, Hash),
     case couch_file:open(FilePath, Options) of
         {ok, Fd} ->
             {ok, Fd};
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 918dabc..f1ba81c 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -944,6 +944,13 @@ set_update_seq(#db{} = Db, UpdateSeq) ->
 
 
 open_docs(#db{} = Db, DocIds) ->
+    case erlang:get(couch_file_hash) of
+        undefined ->
+            Hash = list_to_atom(integer_to_list(mem3_hash:crc32(Db#db.filepath))),
+            erlang:put(couch_file_hash, Hash);
+        _ ->
+            ok
+    end,
     #db{engine = {Engine, EngineState}} = Db,
     Engine:open_docs(EngineState, DocIds).
 
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b1e3555..b29c54d 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -38,7 +38,7 @@
 
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -155,12 +155,40 @@ assemble_file_chunk(Bin, Md5) ->
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
-
 pread_term(Fd, Pos) ->
+    UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
+    pread_term(Fd, Pos, UseCache).
+
+
+pread_term(Fd, Pos, true) ->
+    case erlang:get(couch_file_hash) of
+        undefined ->
+            pread_term(Fd, Pos, false);
+        _ ->
+            load_from_cache(Fd, Pos)
+    end;
+pread_term(Fd, Pos, false) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, couch_compress:decompress(Bin)}.
 
 
+%% TODO: add purpose docs
+load_from_cache(Fd, Pos) ->
+    Hash = erlang:get(couch_file_hash),
+    case ets:lookup(Hash, Pos) of
+        [{Pos, {ok, Res}}] ->
+            {ok, Res};
+        [] ->
+            %% TODO: don't repeat this, but avoid circular recursion
+            %% pread_term(Fd, Pos, false),
+            {ok, Bin} = pread_binary(Fd, Pos),
+            Val = {ok, couch_compress:decompress(Bin)},
+            %% TODO: should probably be inserted directly by the gen_server
+            gen_server:cast(Fd, {cache, Pos, Val}),
+            Val
+    end.
+
+
 %%----------------------------------------------------------------------
 %% Purpose: Reads a binrary from a file that was written with append_binary
 %% Args:    Pos, the offset into the file where the term is serialized.
@@ -407,6 +435,9 @@ init({Filepath, Options, ReturnPid, Ref}) ->
     Limit = get_pread_limit(),
     IsSys = lists:member(sys_db, Options),
     update_read_timestamp(),
+    Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
+    erlang:put(couch_file_cache, Tab),
+    ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -600,6 +631,12 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
 handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
     {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
 
+
+handle_cast({cache, Key, Val}, Fd) ->
+    %% TODO: should we skip if value exists?
+    Tab = erlang:get(couch_file_cache),
+    ets:insert(Tab, {Key, Val}),
+    {noreply, Fd};
 handle_cast(close, Fd) ->
     {stop,normal,Fd}.