You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/25 19:50:39 UTC
[couchdb] branch chewbranca-ioq-experiments-rebase created (now
803f71d)
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a change to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
at 803f71d Fix some tests
This branch includes the following new commits:
new b5bc429 Add couch_file cache
new 05f6b06 Rework cache and IOQ2 pid per couch_file
new 803f71d Fix some tests
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[couchdb] 03/03: Fix some tests
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 803f71d2910a19a5fa40f21222edcd60dbadd0f8
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Aug 23 18:19:50 2021 -0700
Fix some tests
---
src/couch/src/couch_bt_engine_compactor.erl | 5 +-
src/couch/src/couch_bt_engine_stream.erl | 8 +--
src/couch/src/couch_file.erl | 4 +-
src/couch/test/eunit/couch_btree_tests.erl | 11 +++++
src/couch/test/eunit/couch_file_tests.erl | 66 +++++++++++++++++++++----
src/couch_mrview/src/couch_mrview_compactor.erl | 12 ++---
src/couch_mrview/src/couch_mrview_util.erl | 4 +-
7 files changed, 86 insertions(+), 24 deletions(-)
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 3e356e2..297e70c 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -145,8 +145,9 @@ open_compaction_files(DbName, OldSt, Options) ->
retry = nil
}
end,
- unlink(DataFd),
- erlang:monitor(process, MetaFd),
+ unlink(ioq:fd_pid(DataFd)),
+ unlink(ioq:ioq_pid(DataFd)),
+ erlang:monitor(process, ioq:fd_pid(MetaFd)),
{ok, CompSt}.
diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl
index 431894a..e57e9a0 100644
--- a/src/couch/src/couch_bt_engine_stream.erl
+++ b/src/couch/src/couch_bt_engine_stream.erl
@@ -20,6 +20,8 @@
to_disk_term/1
]).
+-include_lib("ioq/include/ioq.hrl").
+
foldl({_Fd, []}, _Fun, Acc) ->
Acc;
@@ -56,9 +58,9 @@ seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) ->
end.
-write({Fd, Written}, Data) when is_pid(Fd) ->
- {ok, Pos, _} = couch_file:append_binary(Fd, Data),
- {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}.
+write({#ioq_file{fd=Fd}=IOF, Written}, Data) when is_pid(Fd) ->
+ {ok, Pos, _} = couch_file:append_binary(IOF, Data),
+ {ok, {IOF, [{Pos, iolist_size(Data)} | Written]}}.
finalize({Fd, Written}) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 675e6d6..a6eeef5 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -889,7 +889,9 @@ is_idle(#file{is_sys=false}) ->
-spec process_info(CouchFilePid :: pid()) ->
{Fd :: pid() | tuple(), FilePath :: string()} | undefined.
-process_info(Pid) ->
+process_info(Pid) when is_pid(Pid) ->
+ couch_util:process_dict_get(Pid, couch_file_fd);
+process_info(#ioq_file{fd=Pid}) ->
couch_util:process_dict_get(Pid, couch_file_fd).
update_read_timestamp() ->
diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl
index c9b791d..9e7d3d0 100644
--- a/src/couch/test/eunit/couch_btree_tests.erl
+++ b/src/couch/test/eunit/couch_btree_tests.erl
@@ -14,6 +14,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(ROWS, 1000).
-define(TIMEOUT, 60). % seconds
@@ -42,6 +43,8 @@ setup_red() ->
setup_red(_) ->
setup_red().
+teardown(#ioq_file{}=Fd) ->
+ ok = couch_file:close(Fd);
teardown(Fd) when is_pid(Fd) ->
ok = couch_file:close(Fd);
teardown({Fd, _}) ->
@@ -74,6 +77,14 @@ red_test_funs() ->
btree_open_test_() ->
+ {
+ setup,
+ fun() -> test_util:start(?MODULE, [ioq]) end,
+ fun test_util:stop/1,
+ fun btree_should_open/0
+ }.
+
+btree_should_open() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
{ok, Btree} = couch_btree:open(nil, Fd, [{compression, none}]),
{
diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl
index 606f4bb..99bb8c0 100644
--- a/src/couch/test/eunit/couch_file_tests.erl
+++ b/src/couch/test/eunit/couch_file_tests.erl
@@ -13,6 +13,7 @@
-module(couch_file_tests).
-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(BLOCK_SIZE, 4096).
-define(setup(F), {setup, fun setup/0, fun teardown/1, F}).
@@ -24,22 +25,52 @@ setup() ->
Fd.
teardown(Fd) ->
- case is_process_alive(Fd) of
+ case is_process_alive(ioq:fd_pid(Fd)) of
true -> ok = couch_file:close(Fd);
false -> ok
end.
+mock_server() ->
+ %%meck:new(config),
+ meck:expect(config, get, fun(Group) ->
+ []
+ end),
+ meck:expect(config, get, fun(_,_) ->
+ undefined
+ end),
+ meck:expect(config, get, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get, fun(_, _, Default) ->
+ Default
+ end),
+ %% meck:expect(config, get, fun(_, _, _) ->
+ %% undefined
+ %% end),
+ meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+ Default
+ end).
+
+
+unmock_server(_) ->
+ true = meck:validate(config),
+ ok = meck:unload(config).
+
open_close_test_() ->
{
"Test for proper file open and close",
{
setup,
+ %%fun() -> mock_server(), test_util:start(?MODULE, [ioq]) end, fun(A) -> unmock_server(A), test_util:stop(A) end,
fun() -> test_util:start(?MODULE, [ioq]) end, fun test_util:stop/1,
[
should_return_enoent_if_missed(),
should_ignore_invalid_flags_with_open(),
?setup(fun should_return_pid_on_file_open/1),
- should_close_file_properly(),
+ ?setup(fun should_close_file_properly/0),
?setup(fun should_create_empty_new_files/1)
]
}
@@ -53,7 +84,7 @@ should_ignore_invalid_flags_with_open() ->
couch_file:open(?tempfile(), [create, invalid_option])).
should_return_pid_on_file_open(Fd) ->
- ?_assert(is_pid(Fd)).
+ ?_assert(is_pid(ioq:fd_pid(Fd))).
should_close_file_properly() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
@@ -138,10 +169,15 @@ should_not_read_beyond_eof(Fd) ->
{ok, Io} = file:open(Filepath, [read, write, binary]),
ok = file:pwrite(Io, Pos, <<0:1/integer, DoubleBin:31/integer>>),
file:close(Io),
- unlink(Fd),
- ExpectedError = {badmatch, {'EXIT', {bad_return_value,
- {read_beyond_eof, Filepath}}}},
- ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ unlink(ioq:fd_pid(Fd)),
+ unlink(ioq:ioq_pid(Fd)),
+ %% ExpectedError = {badmatch, {'EXIT', {bad_return_value,
+ %% {read_beyond_eof, Filepath}}}},
+ %%ExpectedError = {exit, {{bad_return_value, {read_beyond_eof Filepath,}}, _}, _},
+
+ ?_assertExit(
+ {{bad_return_value, {read_beyond_eof, Filepath}}, _},
+ couch_file:pread_binary(Fd, Pos)).
should_truncate(Fd) ->
{ok, 0, _} = couch_file:append_term(Fd, foo),
@@ -179,10 +215,19 @@ should_not_read_more_than_pread_limit(Fd) ->
{_, Filepath} = couch_file:process_info(Fd),
BigBin = list_to_binary(lists:duplicate(100000, 0)),
{ok, Pos, _Size} = couch_file:append_binary(Fd, BigBin),
- unlink(Fd),
+ unlink(ioq:fd_pid(Fd)),
+ unlink(ioq:ioq_pid(Fd)),
ExpectedError = {badmatch, {'EXIT', {bad_return_value,
{exceed_pread_limit, Filepath, 50000}}}},
- ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ ?debugFmt("EXPECTED ERROR IS: ~p~n", [ExpectedError]),
+ %%?_assert(couch_file:pread_binary(Fd, Pos)).
+ %%try couch_file:pread_binary(Fd, Pos)
+ %%catch E:R:S -> ?debugFmt("GOT ERROR: ~p || ~p~n~p~n", [E,R,S])
+ %%end,
+ %%?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ ?_assertExit(
+ {{bad_return_value, {exceed_pread_limit, Filepath, 50000}}, _},
+ couch_file:pread_binary(Fd, Pos)).
header_test_() ->
@@ -537,7 +582,8 @@ fsync_error_test_() ->
fsync_raises_errors() ->
- Fd = spawn(fun() -> fake_fsync_fd() end),
+ FdPid = spawn(fun() -> fake_fsync_fd() end),
+ Fd = #ioq_file{fd=FdPid},
?assertError({fsync_error, eio}, couch_file:sync(Fd)).
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index d42edc0..82d0629 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -115,7 +115,7 @@ compact(State) ->
compact_view(View, EmptyView, BufferSize, Acc)
end, FinalAcc2, lists:zip(Views, EmptyViews)),
- unlink(EmptyState#mrst.fd),
+ unlink(ioq:fd_pid(EmptyState#mrst.fd)),
{ok, EmptyState#mrst{
id_btree=NewIdBtree,
views=NewViews,
@@ -132,7 +132,7 @@ recompact(#mrst{db_name=DbName, idx_name=IdxName}, 0) ->
recompact(State, RetryCount) ->
Self = self(),
- link(State#mrst.fd),
+ link(ioq:fd_pid(State#mrst.fd)),
{Pid, Ref} = erlang:spawn_monitor(fun() ->
couch_index_updater:update(Self, couch_mrview_index, State)
end),
@@ -144,10 +144,10 @@ recompact_loop(Pid, Ref, State, RetryCount) ->
% We've made progress so reset RetryCount
recompact_loop(Pid, Ref, State2, recompact_retry_count());
{'DOWN', Ref, _, _, {updated, Pid, State2}} ->
- unlink(State#mrst.fd),
+ unlink(ioq:fd_pid(State#mrst.fd)),
{ok, State2};
{'DOWN', Ref, _, _, Reason} ->
- unlink(State#mrst.fd),
+ unlink(ioq:fd_pid(State#mrst.fd)),
couch_log:warning("Error during recompaction: ~r", [Reason]),
recompact(State, RetryCount - 1)
end.
@@ -218,7 +218,7 @@ swap_compacted(OldState, NewState) ->
fd=NewFd
} = NewState,
- link(NewState#mrst.fd),
+ link(ioq:fd_pid(NewState#mrst.fd)),
Ref = erlang:monitor(process, NewState#mrst.fd),
RootDir = couch_index_util:root_dir(),
@@ -232,7 +232,7 @@ swap_compacted(OldState, NewState) ->
ok = couch_file:delete(RootDir, IndexFName),
ok = file:rename(CompactFName, IndexFName),
- unlink(OldState#mrst.fd),
+ unlink(ioq:fd_pid(OldState#mrst.fd)),
erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
{ok, NewState#mrst{fd_monitor=Ref}}.
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index d318a3f..6486c93 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -81,7 +81,7 @@ get_signature_from_filename(FileName) ->
get_view(Db, DDoc, ViewName, Args0) ->
case get_view_index_state(Db, DDoc, ViewName, Args0) of
{ok, State, Args2} ->
- Ref = erlang:monitor(process, State#mrst.fd),
+ Ref = erlang:monitor(process, ioq:fd_pid(State#mrst.fd)),
#mrst{language=Lang, views=Views} = State,
{Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
check_range(Args3, view_cmp(View)),
@@ -298,7 +298,7 @@ init_state(Db, Fd, State, Header) ->
State#mrst{
fd=Fd,
- fd_monitor=erlang:monitor(process, Fd),
+ fd_monitor=erlang:monitor(process, ioq:fd_pid(Fd)),
update_seq=Seq,
purge_seq=PurgeSeq,
id_btree=IdBtree,
[couchdb] 02/03: Rework cache and IOQ2 pid per couch_file
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 05f6b0672985c469a75cb54a887b2dab94d3a82d
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:09:28 2021 -0700
Rework cache and IOQ2 pid per couch_file
---
src/couch/src/couch_bt_engine.erl | 11 ++--
src/couch/src/couch_db.erl | 2 +
src/couch/src/couch_file.erl | 105 +++++++++++++++++++++-----------------
3 files changed, 65 insertions(+), 53 deletions(-)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 60f79c3..dc5bbfc 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -117,6 +117,7 @@
-include_lib("kernel/include/file.hrl").
-include_lib("couch/include/couch_db.hrl").
-include("couch_bt_engine.hrl").
+-include_lib("ioq/include/ioq.hrl").
exists(FilePath) ->
@@ -192,8 +193,8 @@ handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) ->
{stop, normal, St#st{fd=undefined, fd_monitor=closed}}.
-incref(St) ->
- {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+incref(#st{fd=#ioq_file{fd=Fd}}=St) ->
+ {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}.
decref(St) ->
@@ -201,8 +202,8 @@ decref(St) ->
ok.
-monitored_by(St) ->
- case erlang:process_info(St#st.fd, monitored_by) of
+monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) ->
+ case erlang:process_info(Fd, monitored_by) of
{monitored_by, Pids} ->
lists:filter(fun is_pid/1, Pids);
_ ->
@@ -920,7 +921,7 @@ init_state(FilePath, Fd, Header0, Options) ->
St = #st{
filepath = FilePath,
fd = Fd,
- fd_monitor = erlang:monitor(process, Fd),
+ fd_monitor = erlang:monitor(process, Fd#ioq_file.fd),
header = Header,
needs_commit = false,
id_tree = IdTree,
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8837101..aec0f54 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -757,6 +757,8 @@ set_security(_, _) ->
throw(bad_request).
set_user_ctx(#db{} = Db, UserCtx) ->
+ %% TODO:
+ %% couch_db_engine:set_user_ctx(Db, UserCtx),
{ok, Db#db{user_ctx = UserCtx}}.
validate_security_object(SecProps) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b29c54d..675e6d6 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -15,6 +15,7 @@
-vsn(2).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(INITIAL_WAIT, 60000).
@@ -33,12 +34,13 @@
is_sys,
eof = 0,
db_monitor,
- pread_limit = 0
+ pread_limit = 0,
+ tab
}).
% public API
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_iolist/2, pread_binary/2]).
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -69,7 +71,9 @@ open(Filepath, Options) ->
case gen_server:start_link(couch_file,
{Filepath, Options, self(), Ref = make_ref()}, []) of
{ok, Fd} ->
- {ok, Fd};
+ {ok, IOQPid} = ioq_server2:start_link({by_shard, Filepath, Fd}),
+ Tab = gen_server:call(Fd, get_cache_ref),
+ {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}};
ignore ->
% get the error
receive
@@ -94,7 +98,7 @@ open(Filepath, Options) ->
end.
-set_db_pid(Fd, Pid) ->
+set_db_pid(#ioq_file{fd=Fd}, Pid) ->
gen_server:call(Fd, {set_db_pid, Pid}).
@@ -156,36 +160,20 @@ assemble_file_chunk(Bin, Md5) ->
%%----------------------------------------------------------------------
pread_term(Fd, Pos) ->
- UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
- pread_term(Fd, Pos, UseCache).
-
-
-pread_term(Fd, Pos, true) ->
- case erlang:get(couch_file_hash) of
- undefined ->
- pread_term(Fd, Pos, false);
- _ ->
- load_from_cache(Fd, Pos)
- end;
-pread_term(Fd, Pos, false) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, couch_compress:decompress(Bin)}.
%% TODO: add purpose docs
-load_from_cache(Fd, Pos) ->
- Hash = erlang:get(couch_file_hash),
- case ets:lookup(Hash, Pos) of
- [{Pos, {ok, Res}}] ->
- {ok, Res};
+load_from_cache(#ioq_file{tab=undefined}, _Pos) ->
+ missing;
+load_from_cache(#ioq_file{tab=Tab}, Pos) ->
+ case ets:lookup(Tab, Pos) of
+ [{Pos, Res}] ->
+ %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]),
+ Res;
[] ->
- %% TODO: don't repeat this, but avoid circular recursion
- %% pread_term(Fd, Pos, false),
- {ok, Bin} = pread_binary(Fd, Pos),
- Val = {ok, couch_compress:decompress(Bin)},
- %% TODO: should probably be inserted directly by the gen_server
- gen_server:cast(Fd, {cache, Pos, Val}),
- Val
+ missing
end.
@@ -202,11 +190,16 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
- case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+ case load_from_cache(Fd, Pos) of
{ok, IoList, Md5} ->
{ok, verify_md5(Fd, Pos, IoList, Md5)};
- Error ->
- Error
+ missing ->
+ case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+ {ok, IoList, Md5} ->
+ {ok, verify_md5(Fd, Pos, IoList, Md5)};
+ Error ->
+ Error
+ end
end.
@@ -259,7 +252,7 @@ append_binaries(Fd, Bins) ->
%%----------------------------------------------------------------------
% length in bytes
-bytes(Fd) ->
+bytes(#ioq_file{fd=Fd}) ->
gen_server:call(Fd, bytes, infinity).
%%----------------------------------------------------------------------
@@ -268,7 +261,7 @@ bytes(Fd) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
-truncate(Fd, Pos) ->
+truncate(#ioq_file{fd=Fd}, Pos) ->
gen_server:call(Fd, {truncate, Pos}, infinity).
%%----------------------------------------------------------------------
@@ -293,7 +286,7 @@ sync(Filepath) when is_list(Filepath) ->
{error, Error} ->
erlang:error(Error)
end;
-sync(Fd) ->
+sync(#ioq_file{fd=Fd}) ->
case gen_server:call(Fd, sync, infinity) of
ok ->
ok;
@@ -305,8 +298,10 @@ sync(Fd) ->
%% Purpose: Close the file.
%% Returns: ok
%%----------------------------------------------------------------------
-close(Fd) ->
- gen_server:call(Fd, close, infinity).
+close(#ioq_file{fd=Fd, ioq=IOP}) ->
+ Res = gen_server:call(Fd, close, infinity),
+ gen_server:call(IOP, close, infinity),
+ Res.
delete(RootDir, Filepath) ->
@@ -423,7 +418,7 @@ init_status_error(ReturnPid, Ref, Error) ->
ignore.
-last_read(Fd) when is_pid(Fd) ->
+last_read(#ioq_file{fd=Fd}) when is_pid(Fd) ->
Now = os:timestamp(),
couch_util:process_dict_get(Fd, read_timestamp, Now).
@@ -435,9 +430,13 @@ init({Filepath, Options, ReturnPid, Ref}) ->
Limit = get_pread_limit(),
IsSys = lists:member(sys_db, Options),
update_read_timestamp(),
- Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
- erlang:put(couch_file_cache, Tab),
- ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
+ ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true),
+ Tab = case ShouldCache of
+ true ->
+ ets:new(?MODULE, [set, protected, {read_concurrency, true}]);
+ false ->
+ undefined
+ end,
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -458,7 +457,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
ok = file:sync(Fd),
maybe_track_open_os_files(Options),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}};
+ {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
false ->
ok = file:close(Fd),
init_status_error(ReturnPid, Ref, {error, eexist})
@@ -466,7 +465,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
false ->
maybe_track_open_os_files(Options),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}}
+ {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}
end;
Error ->
init_status_error(ReturnPid, Ref, Error)
@@ -483,7 +482,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
maybe_track_open_os_files(Options),
{ok, Eof} = file:position(Fd, eof),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit}};
+ {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
Error ->
init_status_error(ReturnPid, Ref, Error)
end;
@@ -522,15 +521,17 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
handle_call({pread_iolist, Pos}, _From, File) ->
update_read_timestamp(),
{LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
- case iolist_to_binary(LenIolist) of
+ Resp = case iolist_to_binary(LenIolist) of
<<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
{Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
{Md5, IoList} = extract_md5(Md5AndIoList),
- {reply, {ok, IoList, Md5}, File};
+ {ok, IoList, Md5};
<<0:1/integer,Len:31/integer>> ->
{Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
- {reply, {ok, Iolist, <<>>}, File}
- end;
+ {ok, Iolist, <<>>}
+ end,
+ maybe_cache(File#file.tab, {Pos, Resp}),
+ {reply, Resp, File};
handle_call({pread_iolists, PosL}, _From, File) ->
update_read_timestamp(),
@@ -629,7 +630,10 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
end;
handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call(get_cache_ref, _From, #file{tab=Tab} = File) ->
+ {reply, Tab, File}.
handle_cast({cache, Key, Val}, Fd) ->
@@ -909,6 +913,11 @@ reset_eof(#file{} = File) ->
{ok, Eof} = file:position(File#file.fd, eof),
File#file{eof = Eof}.
+maybe_cache(undefined, _Obj) ->
+ ok;
+maybe_cache(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
[couchdb] 01/03: Add couch_file cache
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch chewbranca-ioq-experiments-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b5bc42915faea0f6424d5b7d9a99bd9c6691ffb4
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Aug 12 14:32:56 2021 -0700
Add couch_file cache
---
src/couch/src/couch_bt_engine.erl | 2 ++
src/couch/src/couch_db_engine.erl | 7 +++++++
src/couch/src/couch_file.erl | 41 +++++++++++++++++++++++++++++++++++++--
3 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 48e751a..60f79c3 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -846,6 +846,8 @@ copy_props(#st{header = Header} = St, Props) ->
open_db_file(FilePath, Options) ->
+ Hash = list_to_atom(integer_to_list(mem3_hash:crc32(FilePath))),
+ erlang:put(couch_file_hash, Hash),
case couch_file:open(FilePath, Options) of
{ok, Fd} ->
{ok, Fd};
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 918dabc..f1ba81c 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -944,6 +944,13 @@ set_update_seq(#db{} = Db, UpdateSeq) ->
open_docs(#db{} = Db, DocIds) ->
+ case erlang:get(couch_file_hash) of
+ undefined ->
+ Hash = list_to_atom(integer_to_list(mem3_hash:crc32(Db#db.filepath))),
+ erlang:put(couch_file_hash, Hash);
+ _ ->
+ ok
+ end,
#db{engine = {Engine, EngineState}} = Db,
Engine:open_docs(EngineState, DocIds).
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b1e3555..b29c54d 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -38,7 +38,7 @@
% public API
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -155,12 +155,40 @@ assemble_file_chunk(Bin, Md5) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
-
pread_term(Fd, Pos) ->
+ UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
+ pread_term(Fd, Pos, UseCache).
+
+
+pread_term(Fd, Pos, true) ->
+ case erlang:get(couch_file_hash) of
+ undefined ->
+ pread_term(Fd, Pos, false);
+ _ ->
+ load_from_cache(Fd, Pos)
+ end;
+pread_term(Fd, Pos, false) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, couch_compress:decompress(Bin)}.
+%% TODO: add purpose docs
+load_from_cache(Fd, Pos) ->
+ Hash = erlang:get(couch_file_hash),
+ case ets:lookup(Hash, Pos) of
+ [{Pos, {ok, Res}}] ->
+ {ok, Res};
+ [] ->
+ %% TODO: don't repeat this, but avoid circular recursion
+ %% pread_term(Fd, Pos, false),
+ {ok, Bin} = pread_binary(Fd, Pos),
+ Val = {ok, couch_compress:decompress(Bin)},
+ %% TODO: should probably be inserted directly by the gen_server
+ gen_server:cast(Fd, {cache, Pos, Val}),
+ Val
+ end.
+
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -407,6 +435,9 @@ init({Filepath, Options, ReturnPid, Ref}) ->
Limit = get_pread_limit(),
IsSys = lists:member(sys_db, Options),
update_read_timestamp(),
+ Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
+ erlang:put(couch_file_cache, Tab),
+ ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -600,6 +631,12 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+
+handle_cast({cache, Key, Val}, Fd) ->
+ %% TODO: should we skip if value exists?
+ Tab = erlang:get(couch_file_cache),
+ ets:insert(Tab, {Key, Val}),
+ {noreply, Fd};
handle_cast(close, Fd) ->
{stop,normal,Fd}.