You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2021/08/24 22:00:40 UTC

[couchdb] branch chewbranca-ioq-experiments created (now 511cf2d)

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a change to branch chewbranca-ioq-experiments
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 511cf2d  Fix some tests

This branch includes the following new commits:

     new 2cdfb8f  Add couch_file cache
     new 3dc261d  Rework cache and IOQ2 pid per couch_file
     new 511cf2d  Fix some tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[couchdb] 02/03: Rework cache and IOQ2 pid per couch_file

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3dc261da0e0ab0c1428be01e0520aff777f5f5bd
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Aug 17 15:09:28 2021 -0700

    Rework cache and IOQ2 pid per couch_file
---
 src/couch/src/couch_bt_engine.erl |  11 ++--
 src/couch/src/couch_db.erl        |   2 +
 src/couch/src/couch_file.erl      | 105 +++++++++++++++++++++-----------------
 3 files changed, 65 insertions(+), 53 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 60f79c3..dc5bbfc 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -117,6 +117,7 @@
 -include_lib("kernel/include/file.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_bt_engine.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 exists(FilePath) ->
@@ -192,8 +193,8 @@ handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) ->
     {stop, normal, St#st{fd=undefined, fd_monitor=closed}}.
 
 
-incref(St) ->
-    {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+incref(#st{fd=#ioq_file{fd=Fd}}=St) ->
+    {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}.
 
 
 decref(St) ->
@@ -201,8 +202,8 @@ decref(St) ->
     ok.
 
 
-monitored_by(St) ->
-    case erlang:process_info(St#st.fd, monitored_by) of
+monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) ->
+    case erlang:process_info(Fd, monitored_by) of
         {monitored_by, Pids} ->
             lists:filter(fun is_pid/1, Pids);
         _ ->
@@ -920,7 +921,7 @@ init_state(FilePath, Fd, Header0, Options) ->
     St = #st{
         filepath = FilePath,
         fd = Fd,
-        fd_monitor = erlang:monitor(process, Fd),
+        fd_monitor = erlang:monitor(process, Fd#ioq_file.fd),
         header = Header,
         needs_commit = false,
         id_tree = IdTree,
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8837101..aec0f54 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -757,6 +757,8 @@ set_security(_, _) ->
     throw(bad_request).
 
 set_user_ctx(#db{} = Db, UserCtx) ->
+    %% TODO:
+    %% couch_db_engine:set_user_ctx(Db, UserCtx),
     {ok, Db#db{user_ctx = UserCtx}}.
 
 validate_security_object(SecProps) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b29c54d..675e6d6 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -15,6 +15,7 @@
 -vsn(2).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 
 -define(INITIAL_WAIT, 60000).
@@ -33,12 +34,13 @@
     is_sys,
     eof = 0,
     db_monitor,
-    pread_limit = 0
+    pread_limit = 0,
+    tab
 }).
 
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_iolist/2, pread_binary/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -69,7 +71,9 @@ open(Filepath, Options) ->
     case gen_server:start_link(couch_file,
             {Filepath, Options, self(), Ref = make_ref()}, []) of
     {ok, Fd} ->
-        {ok, Fd};
+        {ok, IOQPid} = ioq_server2:start_link({by_shard, Filepath, Fd}),
+        Tab = gen_server:call(Fd, get_cache_ref),
+        {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}};
     ignore ->
         % get the error
         receive
@@ -94,7 +98,7 @@ open(Filepath, Options) ->
     end.
 
 
-set_db_pid(Fd, Pid) ->
+set_db_pid(#ioq_file{fd=Fd}, Pid) ->
     gen_server:call(Fd, {set_db_pid, Pid}).
 
 
@@ -156,36 +160,20 @@ assemble_file_chunk(Bin, Md5) ->
 %%----------------------------------------------------------------------
 
 pread_term(Fd, Pos) ->
-    UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
-    pread_term(Fd, Pos, UseCache).
-
-
-pread_term(Fd, Pos, true) ->
-    case erlang:get(couch_file_hash) of
-        undefined ->
-            pread_term(Fd, Pos, false);
-        _ ->
-            load_from_cache(Fd, Pos)
-    end;
-pread_term(Fd, Pos, false) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, couch_compress:decompress(Bin)}.
 
 
 %% TODO: add purpose docs
-load_from_cache(Fd, Pos) ->
-    Hash = erlang:get(couch_file_hash),
-    case ets:lookup(Hash, Pos) of
-        [{Pos, {ok, Res}}] ->
-            {ok, Res};
+load_from_cache(#ioq_file{tab=undefined}, _Pos) ->
+    missing;
+load_from_cache(#ioq_file{tab=Tab}, Pos) ->
+    case ets:lookup(Tab, Pos) of
+        [{Pos, Res}] ->
+            %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]),
+            Res;
         [] ->
-            %% TODO: don't repeat this, but avoid circular recursion
-            %% pread_term(Fd, Pos, false),
-            {ok, Bin} = pread_binary(Fd, Pos),
-            Val = {ok, couch_compress:decompress(Bin)},
-            %% TODO: should probably be inserted directly by the gen_server
-            gen_server:cast(Fd, {cache, Pos, Val}),
-            Val
+            missing
     end.
 
 
@@ -202,11 +190,16 @@ pread_binary(Fd, Pos) ->
 
 
 pread_iolist(Fd, Pos) ->
-    case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+    case load_from_cache(Fd, Pos) of
         {ok, IoList, Md5} ->
             {ok, verify_md5(Fd, Pos, IoList, Md5)};
-        Error ->
-            Error
+        missing ->
+            case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+                {ok, IoList, Md5} ->
+                    {ok, verify_md5(Fd, Pos, IoList, Md5)};
+                Error ->
+                    Error
+            end
     end.
 
 
@@ -259,7 +252,7 @@ append_binaries(Fd, Bins) ->
 %%----------------------------------------------------------------------
 
 % length in bytes
-bytes(Fd) ->
+bytes(#ioq_file{fd=Fd}) ->
     gen_server:call(Fd, bytes, infinity).
 
 %%----------------------------------------------------------------------
@@ -268,7 +261,7 @@ bytes(Fd) ->
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
-truncate(Fd, Pos) ->
+truncate(#ioq_file{fd=Fd}, Pos) ->
     gen_server:call(Fd, {truncate, Pos}, infinity).
 
 %%----------------------------------------------------------------------
@@ -293,7 +286,7 @@ sync(Filepath) when is_list(Filepath) ->
         {error, Error} ->
             erlang:error(Error)
     end;
-sync(Fd) ->
+sync(#ioq_file{fd=Fd}) ->
     case gen_server:call(Fd, sync, infinity) of
         ok ->
             ok;
@@ -305,8 +298,10 @@ sync(Fd) ->
 %% Purpose: Close the file.
 %% Returns: ok
 %%----------------------------------------------------------------------
-close(Fd) ->
-    gen_server:call(Fd, close, infinity).
+close(#ioq_file{fd=Fd, ioq=IOP}) ->
+    Res = gen_server:call(Fd, close, infinity),
+    gen_server:call(IOP, close, infinity),
+    Res.
 
 
 delete(RootDir, Filepath) ->
@@ -423,7 +418,7 @@ init_status_error(ReturnPid, Ref, Error) ->
     ignore.
 
 
-last_read(Fd) when is_pid(Fd) ->
+last_read(#ioq_file{fd=Fd}) when is_pid(Fd) ->
     Now = os:timestamp(),
     couch_util:process_dict_get(Fd, read_timestamp, Now).
 
@@ -435,9 +430,13 @@ init({Filepath, Options, ReturnPid, Ref}) ->
     Limit = get_pread_limit(),
     IsSys = lists:member(sys_db, Options),
     update_read_timestamp(),
-    Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
-    erlang:put(couch_file_cache, Tab),
-    ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
+    ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true),
+    Tab = case ShouldCache of
+        true ->
+            ets:new(?MODULE, [set, protected, {read_concurrency, true}]);
+        false ->
+            undefined
+    end,
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -458,7 +457,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                     ok = file:sync(Fd),
                     maybe_track_open_os_files(Options),
                     erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                    {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}};
+                    {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
                 false ->
                     ok = file:close(Fd),
                     init_status_error(ReturnPid, Ref, {error, eexist})
@@ -466,7 +465,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
             false ->
                 maybe_track_open_os_files(Options),
                 erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}}
+                {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}
             end;
         Error ->
             init_status_error(ReturnPid, Ref, Error)
@@ -483,7 +482,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                      maybe_track_open_os_files(Options),
                      {ok, Eof} = file:position(Fd, eof),
                      erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
-                     {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit}};
+                     {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
                  Error ->
                      init_status_error(ReturnPid, Ref, Error)
             end;
@@ -522,15 +521,17 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
 handle_call({pread_iolist, Pos}, _From, File) ->
     update_read_timestamp(),
     {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
-    case iolist_to_binary(LenIolist) of
+    Resp = case iolist_to_binary(LenIolist) of
     <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
         {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
         {Md5, IoList} = extract_md5(Md5AndIoList),
-        {reply, {ok, IoList, Md5}, File};
+        {ok, IoList, Md5};
     <<0:1/integer,Len:31/integer>> ->
         {Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
-        {reply, {ok, Iolist, <<>>}, File}
-    end;
+        {ok, Iolist, <<>>}
+    end,
+    maybe_cache(File#file.tab, {Pos, Resp}),
+    {reply, Resp, File};
 
 handle_call({pread_iolists, PosL}, _From, File) ->
     update_read_timestamp(),
@@ -629,7 +630,10 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
     end;
 
 handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
-    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call(get_cache_ref, _From, #file{tab=Tab} = File) ->
+    {reply, Tab, File}.
 
 
 handle_cast({cache, Key, Val}, Fd) ->
@@ -909,6 +913,11 @@ reset_eof(#file{} = File) ->
     {ok, Eof} = file:position(File#file.fd, eof),
     File#file{eof = Eof}.
 
+maybe_cache(undefined, _Obj) ->
+    ok;
+maybe_cache(Tab, Obj) ->
+    ets:insert(Tab, Obj).
+
 -ifdef(TEST).
 -include_lib("couch/include/couch_eunit.hrl").
 

[couchdb] 01/03: Add couch_file cache

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 2cdfb8f967c0860ea2f04554757925731b78106e
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Aug 12 14:32:56 2021 -0700

    Add couch_file cache
---
 src/couch/src/couch_bt_engine.erl |  2 ++
 src/couch/src/couch_db_engine.erl |  7 +++++++
 src/couch/src/couch_file.erl      | 41 +++++++++++++++++++++++++++++++++++++--
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 48e751a..60f79c3 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -846,6 +846,8 @@ copy_props(#st{header = Header} = St, Props) ->
 
 
 open_db_file(FilePath, Options) ->
+    Hash = list_to_atom(integer_to_list(mem3_hash:crc32(FilePath))),
+    erlang:put(couch_file_hash, Hash),
     case couch_file:open(FilePath, Options) of
         {ok, Fd} ->
             {ok, Fd};
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 918dabc..f1ba81c 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -944,6 +944,13 @@ set_update_seq(#db{} = Db, UpdateSeq) ->
 
 
 open_docs(#db{} = Db, DocIds) ->
+    case erlang:get(couch_file_hash) of
+        undefined ->
+            Hash = list_to_atom(integer_to_list(mem3_hash:crc32(Db#db.filepath))),
+            erlang:put(couch_file_hash, Hash);
+        _ ->
+            ok
+    end,
     #db{engine = {Engine, EngineState}} = Db,
     Engine:open_docs(EngineState, DocIds).
 
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b1e3555..b29c54d 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -38,7 +38,7 @@
 
 % public API
 -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
 -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -155,12 +155,40 @@ assemble_file_chunk(Bin, Md5) ->
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
-
 pread_term(Fd, Pos) ->
+    UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
+    pread_term(Fd, Pos, UseCache).
+
+
+pread_term(Fd, Pos, true) ->
+    case erlang:get(couch_file_hash) of
+        undefined ->
+            pread_term(Fd, Pos, false);
+        _ ->
+            load_from_cache(Fd, Pos)
+    end;
+pread_term(Fd, Pos, false) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, couch_compress:decompress(Bin)}.
 
 
+%% TODO: add purpose docs
+load_from_cache(Fd, Pos) ->
+    Hash = erlang:get(couch_file_hash),
+    case ets:lookup(Hash, Pos) of
+        [{Pos, {ok, Res}}] ->
+            {ok, Res};
+        [] ->
+            %% TODO: don't repeat this, but avoid circular recursion
+            %% pread_term(Fd, Pos, false),
+            {ok, Bin} = pread_binary(Fd, Pos),
+            Val = {ok, couch_compress:decompress(Bin)},
+            %% TODO: should probably be inserted directly by the gen_server
+            gen_server:cast(Fd, {cache, Pos, Val}),
+            Val
+    end.
+
+
 %%----------------------------------------------------------------------
 %% Purpose: Reads a binrary from a file that was written with append_binary
 %% Args:    Pos, the offset into the file where the term is serialized.
@@ -407,6 +435,9 @@ init({Filepath, Options, ReturnPid, Ref}) ->
     Limit = get_pread_limit(),
     IsSys = lists:member(sys_db, Options),
     update_read_timestamp(),
+    Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
+    erlang:put(couch_file_cache, Tab),
+    ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
@@ -600,6 +631,12 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
 handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
     {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
 
+
+handle_cast({cache, Key, Val}, Fd) ->
+    %% TODO: should we skip if value exists?
+    Tab = erlang:get(couch_file_cache),
+    ets:insert(Tab, {Key, Val}),
+    {noreply, Fd};
 handle_cast(close, Fd) ->
     {stop,normal,Fd}.
 

[couchdb] 03/03: Fix some tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch chewbranca-ioq-experiments
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 511cf2d15616366aab854cb4d1b798364bf14bf1
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Aug 23 18:19:50 2021 -0700

    Fix some tests
---
 src/couch/src/couch_bt_engine_compactor.erl     |  5 +-
 src/couch/src/couch_bt_engine_stream.erl        |  8 +--
 src/couch/src/couch_file.erl                    |  4 +-
 src/couch/test/eunit/couch_btree_tests.erl      | 11 +++++
 src/couch/test/eunit/couch_file_tests.erl       | 66 +++++++++++++++++++++----
 src/couch_mrview/src/couch_mrview_compactor.erl | 12 ++---
 src/couch_mrview/src/couch_mrview_util.erl      |  4 +-
 7 files changed, 86 insertions(+), 24 deletions(-)

diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 3e356e2..297e70c 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -145,8 +145,9 @@ open_compaction_files(DbName, OldSt, Options) ->
                 retry = nil
             }
     end,
-    unlink(DataFd),
-    erlang:monitor(process, MetaFd),
+    unlink(ioq:fd_pid(DataFd)),
+    unlink(ioq:ioq_pid(DataFd)),
+    erlang:monitor(process, ioq:fd_pid(MetaFd)),
     {ok, CompSt}.
 
 
diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl
index 431894a..e57e9a0 100644
--- a/src/couch/src/couch_bt_engine_stream.erl
+++ b/src/couch/src/couch_bt_engine_stream.erl
@@ -20,6 +20,8 @@
     to_disk_term/1
 ]).
 
+-include_lib("ioq/include/ioq.hrl").
+
 
 foldl({_Fd, []}, _Fun, Acc) ->
     Acc;
@@ -56,9 +58,9 @@ seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) ->
     end.
 
 
-write({Fd, Written}, Data) when is_pid(Fd) ->
-    {ok, Pos, _} = couch_file:append_binary(Fd, Data),
-    {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}.
+write({#ioq_file{fd=Fd}=IOF, Written}, Data) when is_pid(Fd) ->
+    {ok, Pos, _} = couch_file:append_binary(IOF, Data),
+    {ok, {IOF, [{Pos, iolist_size(Data)} | Written]}}.
 
 
 finalize({Fd, Written}) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 675e6d6..a6eeef5 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -889,7 +889,9 @@ is_idle(#file{is_sys=false}) ->
 -spec process_info(CouchFilePid :: pid()) ->
     {Fd :: pid() | tuple(), FilePath :: string()} | undefined.
 
-process_info(Pid) ->
+process_info(Pid) when is_pid(Pid) ->
+    couch_util:process_dict_get(Pid, couch_file_fd);
+process_info(#ioq_file{fd=Pid}) ->
     couch_util:process_dict_get(Pid, couch_file_fd).
 
 update_read_timestamp() ->
diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl
index c9b791d..9e7d3d0 100644
--- a/src/couch/test/eunit/couch_btree_tests.erl
+++ b/src/couch/test/eunit/couch_btree_tests.erl
@@ -14,6 +14,7 @@
 
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 -define(ROWS, 1000).
 -define(TIMEOUT, 60). % seconds
@@ -42,6 +43,8 @@ setup_red() ->
 setup_red(_) ->
     setup_red().
 
+teardown(#ioq_file{}=Fd) ->
+    ok = couch_file:close(Fd);
 teardown(Fd) when is_pid(Fd) ->
     ok = couch_file:close(Fd);
 teardown({Fd, _}) ->
@@ -74,6 +77,14 @@ red_test_funs() ->
 
 
 btree_open_test_() ->
+    {
+        setup,
+        fun() -> test_util:start(?MODULE, [ioq]) end,
+        fun test_util:stop/1,
+        fun btree_should_open/0
+    }.
+
+btree_should_open() ->
     {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
     {ok, Btree} = couch_btree:open(nil, Fd, [{compression, none}]),
     {
diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl
index 606f4bb..99bb8c0 100644
--- a/src/couch/test/eunit/couch_file_tests.erl
+++ b/src/couch/test/eunit/couch_file_tests.erl
@@ -13,6 +13,7 @@
 -module(couch_file_tests).
 
 -include_lib("couch/include/couch_eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
 
 -define(BLOCK_SIZE, 4096).
 -define(setup(F), {setup, fun setup/0, fun teardown/1, F}).
@@ -24,22 +25,52 @@ setup() ->
     Fd.
 
 teardown(Fd) ->
-    case is_process_alive(Fd) of
+    case is_process_alive(ioq:fd_pid(Fd)) of
         true -> ok = couch_file:close(Fd);
         false -> ok
     end.
 
+mock_server() ->
+    %%meck:new(config),
+    meck:expect(config, get, fun(Group) ->
+        []
+    end),
+    meck:expect(config, get, fun(_,_) ->
+        undefined
+    end),
+    meck:expect(config, get, fun("ioq2", _, Default) ->
+        Default
+    end),
+    meck:expect(config, get, fun(_, _, Default) ->
+        Default
+    end),
+    %%  meck:expect(config, get, fun(_, _, _) ->
+    %%      undefined
+    %%  end),
+    meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+        Default
+    end),
+    meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+        Default
+    end).
+
+
+unmock_server(_) ->
+    true = meck:validate(config),
+    ok = meck:unload(config).
+
 open_close_test_() ->
     {
         "Test for proper file open and close",
         {
             setup,
+            %%fun() -> mock_server(), test_util:start(?MODULE, [ioq]) end, fun(A) -> unmock_server(A), test_util:stop(A) end,
             fun() -> test_util:start(?MODULE, [ioq]) end, fun test_util:stop/1,
             [
                 should_return_enoent_if_missed(),
                 should_ignore_invalid_flags_with_open(),
                 ?setup(fun should_return_pid_on_file_open/1),
-                should_close_file_properly(),
+                ?setup(fun should_close_file_properly/0),
                 ?setup(fun should_create_empty_new_files/1)
             ]
         }
@@ -53,7 +84,7 @@ should_ignore_invalid_flags_with_open() ->
                   couch_file:open(?tempfile(), [create, invalid_option])).
 
 should_return_pid_on_file_open(Fd) ->
-    ?_assert(is_pid(Fd)).
+    ?_assert(is_pid(ioq:fd_pid(Fd))).
 
 should_close_file_properly() ->
     {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
@@ -138,10 +169,15 @@ should_not_read_beyond_eof(Fd) ->
     {ok, Io} = file:open(Filepath, [read, write, binary]),
     ok = file:pwrite(Io, Pos, <<0:1/integer, DoubleBin:31/integer>>),
     file:close(Io),
-    unlink(Fd),
-    ExpectedError = {badmatch, {'EXIT', {bad_return_value,
-        {read_beyond_eof, Filepath}}}},
-    ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+    unlink(ioq:fd_pid(Fd)),
+    unlink(ioq:ioq_pid(Fd)),
+    %% ExpectedError = {badmatch, {'EXIT', {bad_return_value,
+    %%     {read_beyond_eof, Filepath}}}},
+    %%ExpectedError = {exit, {{bad_return_value, {read_beyond_eof Filepath,}}, _}, _},
+
+    ?_assertExit(
+        {{bad_return_value, {read_beyond_eof, Filepath}}, _},
+        couch_file:pread_binary(Fd, Pos)).
 
 should_truncate(Fd) ->
     {ok, 0, _} = couch_file:append_term(Fd, foo),
@@ -179,10 +215,19 @@ should_not_read_more_than_pread_limit(Fd) ->
     {_, Filepath} = couch_file:process_info(Fd),
     BigBin = list_to_binary(lists:duplicate(100000, 0)),
     {ok, Pos, _Size} = couch_file:append_binary(Fd, BigBin),
-    unlink(Fd),
+    unlink(ioq:fd_pid(Fd)),
+    unlink(ioq:ioq_pid(Fd)),
     ExpectedError = {badmatch, {'EXIT', {bad_return_value,
         {exceed_pread_limit, Filepath, 50000}}}},
-    ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+    ?debugFmt("EXPECTED ERROR IS: ~p~n", [ExpectedError]),
+    %%?_assert(couch_file:pread_binary(Fd, Pos)).
+    %%try couch_file:pread_binary(Fd, Pos)
+    %%catch E:R:S -> ?debugFmt("GOT ERROR: ~p || ~p~n~p~n", [E,R,S])
+    %%end,
+    %%?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+    ?_assertExit(
+        {{bad_return_value, {exceed_pread_limit, Filepath, 50000}}, _},
+        couch_file:pread_binary(Fd, Pos)).
 
 
 header_test_() ->
@@ -537,7 +582,8 @@ fsync_error_test_() ->
 
 
 fsync_raises_errors() ->
-    Fd = spawn(fun() -> fake_fsync_fd() end),
+    FdPid = spawn(fun() -> fake_fsync_fd() end),
+    Fd = #ioq_file{fd=FdPid},
     ?assertError({fsync_error, eio}, couch_file:sync(Fd)).
 
 
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index d42edc0..82d0629 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -115,7 +115,7 @@ compact(State) ->
         compact_view(View, EmptyView, BufferSize, Acc)
     end, FinalAcc2, lists:zip(Views, EmptyViews)),
 
-    unlink(EmptyState#mrst.fd),
+    unlink(ioq:fd_pid(EmptyState#mrst.fd)),
     {ok, EmptyState#mrst{
         id_btree=NewIdBtree,
         views=NewViews,
@@ -132,7 +132,7 @@ recompact(#mrst{db_name=DbName, idx_name=IdxName}, 0) ->
 
 recompact(State, RetryCount) ->
     Self = self(),
-    link(State#mrst.fd),
+    link(ioq:fd_pid(State#mrst.fd)),
     {Pid, Ref} = erlang:spawn_monitor(fun() ->
         couch_index_updater:update(Self, couch_mrview_index, State)
     end),
@@ -144,10 +144,10 @@ recompact_loop(Pid, Ref, State, RetryCount) ->
             % We've made progress so reset RetryCount
             recompact_loop(Pid, Ref, State2, recompact_retry_count());
         {'DOWN', Ref, _, _, {updated, Pid, State2}} ->
-            unlink(State#mrst.fd),
+            unlink(ioq:fd_pid(State#mrst.fd)),
             {ok, State2};
         {'DOWN', Ref, _, _, Reason} ->
-            unlink(State#mrst.fd),
+            unlink(ioq:fd_pid(State#mrst.fd)),
             couch_log:warning("Error during recompaction: ~r", [Reason]),
             recompact(State, RetryCount - 1)
     end.
@@ -218,7 +218,7 @@ swap_compacted(OldState, NewState) ->
         fd=NewFd
     } = NewState,
 
-    link(NewState#mrst.fd),
+    link(ioq:fd_pid(NewState#mrst.fd)),
     Ref = erlang:monitor(process, NewState#mrst.fd),
 
     RootDir = couch_index_util:root_dir(),
@@ -232,7 +232,7 @@ swap_compacted(OldState, NewState) ->
     ok = couch_file:delete(RootDir, IndexFName),
     ok = file:rename(CompactFName, IndexFName),
 
-    unlink(OldState#mrst.fd),
+    unlink(ioq:fd_pid(OldState#mrst.fd)),
     erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
 
     {ok, NewState#mrst{fd_monitor=Ref}}.
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index d318a3f..6486c93 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -81,7 +81,7 @@ get_signature_from_filename(FileName) ->
 get_view(Db, DDoc, ViewName, Args0) ->
     case get_view_index_state(Db, DDoc, ViewName, Args0) of
         {ok, State, Args2} ->
-            Ref = erlang:monitor(process, State#mrst.fd),
+            Ref = erlang:monitor(process, ioq:fd_pid(State#mrst.fd)),
             #mrst{language=Lang, views=Views} = State,
             {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
             check_range(Args3, view_cmp(View)),
@@ -298,7 +298,7 @@ init_state(Db, Fd, State, Header) ->
 
     State#mrst{
         fd=Fd,
-        fd_monitor=erlang:monitor(process, Fd),
+        fd_monitor=erlang:monitor(process, ioq:fd_pid(Fd)),
         update_seq=Seq,
         purge_seq=PurgeSeq,
         id_btree=IdBtree,