You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ii...@apache.org on 2019/02/14 13:42:22 UTC

[couchdb] branch master updated: Set io_priority for couch_index pids

This is an automated email from the ASF dual-hosted git repository.

iilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 13bf0eb  Set io_priority for couch_index pids
     new 9b85da8  Merge pull request #1642 from cloudant/91984-set-io_priority-for-couch-index-pids
13bf0eb is described below

commit 13bf0eb80813477bf3fbe79cffaf0faddffaaca9
Author: ILYA Khlopotov <ii...@apache.org>
AuthorDate: Fri Sep 28 17:06:08 2018 -0700

    Set io_priority for couch_index pids
---
 src/couch/src/couch_debug.erl                 |  20 +--
 src/couch/src/test_util.erl                   |  17 ++
 src/couch/test/couch_index_tests.erl          | 234 ++++++++++++++++++++++++++
 src/couch_index/src/couch_index_compactor.erl |   2 +
 src/couch_index/src/couch_index_updater.erl   |   2 +
 src/couch_mrview/src/couch_mrview_updater.erl |  11 +-
 6 files changed, 271 insertions(+), 15 deletions(-)

diff --git a/src/couch/src/couch_debug.erl b/src/couch/src/couch_debug.erl
index 9506a80..290d095 100644
--- a/src/couch/src/couch_debug.erl
+++ b/src/couch/src/couch_debug.erl
@@ -234,19 +234,17 @@ opened_files_contains(FileNameFragment) ->
         string:str(Path, FileNameFragment) > 0
     end, couch_debug:opened_files()).
 
+
 process_name(Pid) when is_pid(Pid) ->
-    case process_info(Pid, registered_name) of
-        {registered_name, Name} ->
+    Info = process_info(Pid, [registered_name, dictionary, initial_call]),
+    case Info of
+        undefined ->
+            iolist_to_list(io_lib:format("[~p]", [Pid]));
+        [{registered_name, Name} | _] when Name =/= [] ->
             iolist_to_list(io_lib:format("~s[~p]", [Name, Pid]));
-        _ ->
-            {dictionary, Dict} = process_info(Pid, dictionary),
-            case proplists:get_value('$initial_call', Dict) of
-                undefined ->
-                    {initial_call, {M, F, A}} = process_info(Pid, initial_call),
-                    iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid]));
-                {M, F, A} ->
-                    iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid]))
-            end
+        [_, {dictionary, Dict}, {initial_call, MFA}] ->
+            {M, F, A} = proplists:get_value('$initial_call', Dict, MFA),
+            iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid]))
     end;
 process_name(Else) ->
     iolist_to_list(io_lib:format("~p", [Else])).
diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl
index efb5064..9566e8e 100644
--- a/src/couch/src/test_util.erl
+++ b/src/couch/src/test_util.erl
@@ -32,6 +32,7 @@
 -export([with_process_restart/1, with_process_restart/2, with_process_restart/3]).
 -export([wait_process/1, wait_process/2]).
 -export([wait/1, wait/2, wait/3]).
+-export([wait_value/2, wait_other_value/2]).
 
 -export([start/1, start/2, start/3, stop/1]).
 
@@ -222,6 +223,22 @@ wait(Fun, Timeout, Delay, Started, _Prev) ->
         Else
     end.
 
+wait_value(Fun, Value) ->
+    wait(fun() ->
+        case Fun() of
+            Value -> Value;
+            _ -> wait
+        end
+    end).
+
+wait_other_value(Fun, Value) ->
+    wait(fun() ->
+        case Fun() of
+            Value -> wait;
+            Other -> Other
+        end
+    end).
+
 start(Module) ->
     start(Module, [], []).
 
diff --git a/src/couch/test/couch_index_tests.erl b/src/couch/test/couch_index_tests.erl
new file mode 100644
index 0000000..fab3806
--- /dev/null
+++ b/src/couch/test/couch_index_tests.erl
@@ -0,0 +1,234 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-define(TIMEOUT, 1000).
+
+setup() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    create_design_doc(DbName, <<"_design/foo">>, <<"bar">>),
+    tracer_new(),
+    DbName.
+
+teardown(DbName) ->
+    tracer_delete(),
+    couch_server:delete(DbName, [?ADMIN_CTX]).
+
+couch_index_ioq_priority_test_() ->
+    {
+        "Test ioq_priority for views",
+        {
+            setup,
+            fun test_util:start_couch/0, fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun check_io_priority_for_updater/1,
+                    fun check_io_priority_for_compactor/1
+                ]
+            }
+        }
+    }.
+
+
+check_io_priority_for_updater(DbName) ->
+    ?_test(begin
+        {ok, IndexerPid} = couch_index_server:get_index(
+            couch_mrview_index, DbName, <<"_design/foo">>),
+        CouchIndexUpdaterPid = updater_pid(IndexerPid),
+        tracer_record(CouchIndexUpdaterPid),
+
+        create_docs(DbName),
+
+        CommittedSeq = couch_util:with_db(DbName, fun(Db) -> couch_db:get_update_seq(Db) end),
+        couch_index:get_state(IndexerPid, CommittedSeq),
+        [UpdaterPid] = wait_spawn_event_for_pid(CouchIndexUpdaterPid),
+
+        [UpdaterMapProcess] = wait_spawn_by_anonymous_fun(
+            UpdaterPid, '-start_update/4-fun-0-'),
+
+        ?assert(wait_set_io_priority(
+            UpdaterMapProcess, {view_update, DbName, <<"_design/foo">>})),
+
+        [UpdaterWriterProcess] = wait_spawn_by_anonymous_fun(
+            UpdaterPid, '-start_update/4-fun-1-'),
+        ?assert(wait_set_io_priority(
+            UpdaterWriterProcess, {view_update, DbName, <<"_design/foo">>})),
+
+        ok
+    end).
+
+check_io_priority_for_compactor(DbName) ->
+    ?_test(begin
+        {ok, IndexerPid} = couch_index_server:get_index(
+            couch_mrview_index, DbName, <<"_design/foo">>),
+        {ok, CompactorPid} = couch_index:get_compactor_pid(IndexerPid),
+        tracer_record(CompactorPid),
+
+        create_docs(DbName),
+
+        couch_index:compact(IndexerPid),
+        wait_spawn_event_for_pid(CompactorPid),
+
+        [CompactorProcess] = wait_spawn_by_anonymous_fun(
+            CompactorPid, '-handle_call/3-fun-0-'),
+        ?assert(wait_set_io_priority(
+            CompactorProcess, {view_compact, DbName, <<"_design/foo">>})),
+        ok
+    end).
+
+create_docs(DbName) ->
+    {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+    Doc1 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc1">>},
+        {<<"value">>, 1}
+
+    ]}),
+    Doc2 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc2">>},
+        {<<"value">>, 2}
+
+    ]}),
+    Doc3 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc3">>},
+        {<<"value">>, 3}
+
+    ]}),
+    {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2, Doc3]),
+    couch_db:ensure_full_commit(Db),
+    couch_db:close(Db).
+
+create_design_doc(DbName, DDName, ViewName) ->
+    {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+    DDoc = couch_doc:from_json_obj({[
+        {<<"_id">>, DDName},
+        {<<"language">>, <<"javascript">>},
+        {<<"views">>, {[
+            {ViewName, {[
+                {<<"map">>, <<"function(doc) { emit(doc.value, null); }">>}
+            ]}}
+        ]}}
+    ]}),
+    {ok, Rev} = couch_db:update_doc(Db, DDoc, []),
+    couch_db:ensure_full_commit(Db),
+    couch_db:close(Db),
+    Rev.
+
+wait_set_io_priority(Pid, IOPriority) ->
+    test_util:wait_value(fun() ->
+        does_process_set_io_priority(Pid, IOPriority)
+    end, true).
+
+does_process_set_io_priority(Pid, IOPriority) ->
+    PutCallsArgs = find_calls_to_fun(Pid, {erlang, put, 2}),
+    lists:any(fun([_, Priority]) -> Priority =:= IOPriority end, PutCallsArgs).
+
+wait_events(MatchSpec) ->
+    test_util:wait_other_value(fun() -> select(MatchSpec) end, []).
+
+find_spawned_by_anonymous_fun(ParentPid, Name) ->
+    AnonymousFuns = select(ets:fun2ms(fun
+        ({spawned, Pid, _TS, _Name, _Dict, [PPid, {erlang, apply, [Fun, _]}]})
+            when is_function(Fun) andalso PPid =:= ParentPid -> {Pid, Fun}
+    end)),
+    lists:filtermap(fun({Pid, Fun}) ->
+        case erlang:fun_info(Fun, name) of
+            {name, Name} -> {true, Pid};
+            _ -> false
+        end
+    end, AnonymousFuns).
+
+find_calls_to_fun(Pid, {Module, Function, Arity}) ->
+    select(ets:fun2ms(fun
+        ({call, P, _TS, _Name, _Dict, [{M, F, Args}]})
+            when length(Args) =:= Arity
+                andalso M =:= Module
+                andalso F =:= Function
+                andalso P =:= Pid
+                -> Args
+    end)).
+
+wait_spawn_event_for_pid(ParentPid) ->
+    wait_events(ets:fun2ms(fun
+         ({spawned, Pid, _TS, _Name, _Dict, [P, _]}) when P =:= ParentPid -> Pid
+    end)).
+
+wait_spawn_by_anonymous_fun(ParentPid, Name) ->
+    test_util:wait_other_value(fun() ->
+        find_spawned_by_anonymous_fun(ParentPid, Name)
+    end, []).
+
+updater_pid(IndexerPid) ->
+    {links, Links} = process_info(IndexerPid, links),
+    [Pid] = select_process_by_name_prefix(Links, "couch_index_updater:init/1"),
+    Pid.
+
+select_process_by_name_prefix(Pids, Name) ->
+    lists:filter(fun(Pid) ->
+        Key = couch_debug:process_name(Pid),
+        string:str(Key, Name) =:= 1
+    end, Pids).
+
+select(MatchSpec) ->
+    lists:filtermap(fun(Event) ->
+        case ets:test_ms(Event, MatchSpec) of
+            {ok, false} -> false;
+            {ok, Result} -> {true, Result};
+            _ -> false
+        end
+    end, tracer_events()).
+
+
+%% ========================
+%% Tracer related functions
+%% ------------------------
+tracer_new() ->
+    ets:new(?MODULE, [public, named_table]),
+    {ok, _Tracer} = dbg:tracer(process, {fun tracer_collector/2, 0}),
+    ok.
+
+tracer_delete() ->
+    dbg:stop_clear(),
+    (catch ets:delete(?MODULE)),
+    ok.
+
+tracer_record(Pid) ->
+    {ok, _} = dbg:tp(erlang, put, x),
+    {ok, _} = dbg:p(Pid, [c, p, sos]),
+    ok.
+
+tracer_events() ->
+    Events = [{Idx, E} || [Idx, E] <- ets:match(?MODULE, {{trace, '$1'}, '$2'})],
+    {_, Sorted} = lists:unzip(lists:keysort(1, Events)),
+    Sorted.
+
+tracer_collector(Msg, Seq) ->
+    ets:insert(?MODULE, {{trace, Seq}, normalize_trace_msg(Msg)}),
+    Seq + 1.
+
+normalize_trace_msg(TraceMsg) ->
+    case tuple_to_list(TraceMsg) of
+        [trace_ts, Pid, Type | Info] ->
+            {TraceInfo, [Timestamp]} = lists:split(length(Info)-1, Info),
+            {Type, Pid, Timestamp, couch_debug:process_name(Pid), process_info(Pid), TraceInfo};
+        [trace, Pid, Type | TraceInfo] ->
+            {Type, Pid, os:timestamp(), couch_debug:process_name(Pid), process_info(Pid), TraceInfo}
+    end.
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl
index 61f406c..8849cf6 100644
--- a/src/couch_index/src/couch_index_compactor.erl
+++ b/src/couch_index/src/couch_index_compactor.erl
@@ -117,6 +117,8 @@ compact(Parent, Mod, IdxState) ->
 
 compact(Idx, Mod, IdxState, Opts) ->
     DbName = Mod:get(db_name, IdxState),
+    IndexName = Mod:get(idx_name, IdxState),
+    erlang:put(io_priority, {view_compact, DbName, IndexName}),
     Args = [DbName, Mod:get(idx_name, IdxState)],
     couch_log:info("Compaction started for db: ~s idx: ~s", Args),
     {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) ->
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index 7864bde..fb15db0 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -128,6 +128,8 @@ code_change(_OldVsn, State, _Extra) ->
 
 update(Idx, Mod, IdxState) ->
     DbName = Mod:get(db_name, IdxState),
+    IndexName = Mod:get(idx_name, IdxState),
+    erlang:put(io_priority, {view_update, DbName, IndexName}),
     CurrSeq = Mod:get(update_seq, IdxState),
     UpdateOpts = Mod:get(update_options, IdxState),
     CommittedOnly = lists:member(committed_only, UpdateOpts),
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9740e6a..18657b4 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -25,7 +25,6 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
     QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
     {ok, DocQueue} = couch_work_queue:new(QueueOpts),
     {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
-
     InitState = State#mrst{
         first_build=State#mrst.update_seq==0,
         partial_resp_pid=Partial,
@@ -37,6 +36,8 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
     Self = self(),
 
     MapFun = fun() ->
+        erlang:put(io_priority,
+            {view_update, State#mrst.db_name, State#mrst.idx_name}),
         Progress = case NumChanges of
             0 -> 0;
             _ -> (NumChangesDone * 100) div NumChanges
@@ -53,8 +54,11 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
         couch_task_status:set_update_frequency(500),
         map_docs(Self, InitState)
     end,
-    WriteFun = fun() -> write_results(Self, InitState) end,
-
+    WriteFun = fun() ->
+        erlang:put(io_priority,
+            {view_update, State#mrst.db_name, State#mrst.idx_name}),
+        write_results(Self, InitState)
+    end,
     spawn_link(MapFun),
     spawn_link(WriteFun),
 
@@ -219,7 +223,6 @@ write_results(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State) ->
         stop ->
             Parent ! {new_state, State};
         {Go, {Seq, ViewKVs, DocIdKeys, Seqs, Log}} ->
-            erlang:put(io_priority, {view_update, DbName, IdxName}),
             NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Seqs, Log),
             if Go == stop ->
                 Parent ! {new_state, NewState};