You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2024/02/12 23:36:15 UTC

(couchdb) 14/28: Make CSRT toggle-able and rework delta sending

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4bce276f8cf6060d3f91a71f2c301c3cf0389d84
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Nov 20 14:25:32 2023 -0800

    Make CSRT toggle-able and rework delta sending
---
 .../src/couch_stats_resource_tracker.erl           | 163 +++++++++++++--------
 src/rexi/src/rexi.erl                              |  27 +++-
 src/rexi/src/rexi_utils.erl                        |  27 +++-
 3 files changed, 143 insertions(+), 74 deletions(-)

diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl
index 5097f2866..fb0d39cd3 100644
--- a/src/couch_stats/src/couch_stats_resource_tracker.erl
+++ b/src/couch_stats/src/couch_stats_resource_tracker.erl
@@ -34,6 +34,7 @@
 -export([
     create_context/0, create_context/1, create_context/3,
     create_coordinator_context/1, create_coordinator_context/2,
+    is_enabled/0,
     get_resource/0,
     get_resource/1,
     set_context_dbname/1,
@@ -175,11 +176,15 @@
     get_kp_node = 0
 }).
 
+is_enabled() ->
+    config:get_boolean("couch_stats_resource_tracker", "enabled", true).
+
 db_opened() -> inc(db_opened).
 doc_read() -> inc(docs_read).
 row_read() -> inc(rows_read).
 btree_fold() -> inc(?COUCH_BT_FOLDS).
-ioq_called() -> inc(ioq_calls).
+%% TODO: do we need ioq_called and this access pattern?
+ioq_called() -> is_enabled() andalso inc(ioq_calls).
 js_evaled() -> inc(js_evals).
 js_filtered() -> inc(js_filter).
 js_filtered_error() -> inc(js_filter_error).
@@ -290,29 +295,29 @@ maybe_inc(_Metric, _Val) ->
 
 %% TODO: update stats_descriptions.cfg for relevant apps
 should_track([fabric_rpc, all_docs, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, changes, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, changes, processed]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, changes, returned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, map_view, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, reduce_view, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, get_all_security, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, open_doc, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, update_docs, spawned]) ->
-    true;
+    is_enabled();
 should_track([fabric_rpc, open_shard, spawned]) ->
-    true;
+    is_enabled();
 should_track([mango_cursor, view, all_docs]) ->
-    true;
+    is_enabled();
 should_track([mango_cursor, view, idx]) ->
-    true;
+    is_enabled();
 should_track(_Metric) ->
     %%io:format("SKIPPING METRIC: ~p~n", [Metric]),
     false.
@@ -320,14 +325,14 @@ should_track(_Metric) ->
 accumulate_delta(Delta) ->
     %% TODO: switch to creating a batch of updates to invoke a single
     %% update_counter rather than sequentially invoking it for each field
-    maps:foreach(fun inc/2, Delta).
+    is_enabled() andalso maps:foreach(fun inc/2, Delta).
 
 update_counter(Field, Count) ->
-    update_counter(get_pid_ref(), Field, Count).
+    is_enabled() andalso update_counter(get_pid_ref(), Field, Count).
 
 
 update_counter({_Pid,_Ref}=PidRef, Field, Count) ->
-    ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}).
+    is_enabled() andalso ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}).
 
 
 active() -> active_int(all).
@@ -622,73 +627,98 @@ get_pid_ref() ->
 
 
 create_context() ->
-    create_context(self()).
+    is_enabled() andalso create_context(self()).
 
 
 create_context(Pid) ->
-    Ref = make_ref(),
-    Rctx = make_record(Pid, Ref),
-    track(Rctx),
-    ets:insert(?MODULE, Rctx),
-    Rctx.
+    case is_enabled() of
+        false ->
+            ok;
+        true ->
+            Ref = make_ref(),
+            Rctx = make_record(Pid, Ref),
+            track(Rctx),
+            ets:insert(?MODULE, Rctx),
+            Rctx
+    end.
 
 %% add type to disnguish coordinator vs rpc_worker
 create_context(From, {M,F,_A} = MFA, Nonce) ->
-    PidRef = get_pid_ref(), %% this will instantiate a new PidRef
-    %% TODO: extract user_ctx and db/shard from
-    Rctx = #rctx{
-        pid_ref = PidRef,
-        from = From,
-        mfa = MFA,
-        type = {worker, M, F},
-        nonce = Nonce
-    },
-    track(Rctx),
-    erlang:put(?DELTA_TZ, Rctx),
-    true = ets:insert(?MODULE, Rctx),
-    Rctx.
+    case is_enabled() of
+        false ->
+            ok;
+        true ->
+            PidRef = get_pid_ref(), %% this will instantiate a new PidRef
+            %% TODO: extract user_ctx and db/shard from
+            Rctx = #rctx{
+                pid_ref = PidRef,
+                from = From,
+                mfa = MFA,
+                type = {worker, M, F},
+                nonce = Nonce
+            },
+            track(Rctx),
+            erlang:put(?DELTA_TZ, Rctx),
+            true = ets:insert(?MODULE, Rctx),
+            Rctx
+    end.
 
 create_coordinator_context(#httpd{path_parts=Parts} = Req) ->
-    create_coordinator_context(Req, io_lib:format("~p", [Parts])).
+    is_enabled() andalso create_coordinator_context(Req, io_lib:format("~p", [Parts])).
 
 create_coordinator_context(#httpd{} = Req, Path) ->
-    #httpd{
-        method = Verb,
-        nonce = Nonce
-    } = Req,
-    PidRef = get_pid_ref(), %% this will instantiate a new PidRef
-    Rctx = #rctx{
-        pid_ref = PidRef,
-        type = {coordinator, Verb, [$/ | Path]},
-        nonce = Nonce
-    },
-    track(Rctx),
-    erlang:put(?DELTA_TZ, Rctx),
-    true = ets:insert(?MODULE, Rctx),
-    Rctx.
+    case is_enabled() of
+        false ->
+            ok;
+        true ->
+            #httpd{
+                method = Verb,
+                nonce = Nonce
+            } = Req,
+            PidRef = get_pid_ref(), %% this will instantiate a new PidRef
+            Rctx = #rctx{
+                pid_ref = PidRef,
+                type = {coordinator, Verb, [$/ | Path]},
+                nonce = Nonce
+            },
+            track(Rctx),
+            erlang:put(?DELTA_TZ, Rctx),
+            true = ets:insert(?MODULE, Rctx),
+            Rctx
+    end.
 
 set_context_dbname(DbName) ->
-    case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
+    case is_enabled() of
         false ->
-            Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
-            io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
-            timer:sleep(1000),
-            erlang:halt(kaboomz);
+            ok;
         true ->
-            true
+            case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
+                false ->
+                    Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
+                    io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
+                    timer:sleep(1000),
+                    erlang:halt(kaboomz);
+                true ->
+                    true
+            end
     end.
 
 set_context_username(null) ->
     ok;
 set_context_username(UserName) ->
-    case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of
+    case is_enabled() of
         false ->
-            Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
-            io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
-            timer:sleep(1000),
-            erlang:halt(kaboomz);
+            ok;
         true ->
-            true
+            case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of
+                false ->
+                    Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
+                    io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
+                    timer:sleep(1000),
+                    erlang:halt(kaboomz);
+                true ->
+                    true
+            end
     end.
 
 track(#rctx{}=Rctx) ->
@@ -830,7 +860,12 @@ init([]) ->
         {keypos, #rctx.pid_ref}
     ]),
     St = #st{},
-    _TimerRef = erlang:send_after(St#st.scan_interval, self(), scan),
+    case is_enabled() of
+        false ->
+            ok;
+        true ->
+            _TimerRef = erlang:send_after(St#st.scan_interval, self(), scan)
+    end,
     {ok, St}.
 
 handle_call(fetch, _from, #st{} = St) ->
@@ -852,7 +887,7 @@ handle_info(scan, #st{tracking=AT0} = St0) ->
     AT = maybe_track(Unmonitored, AT0),
     _TimerRef = erlang:send_after(St0#st.scan_interval, self(), scan),
     {noreply, St0#st{tracking=AT}};
-handle_info({'DOWN', MonRef, Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
+handle_info({'DOWN', MonRef, _Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
     %% io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]),
     St = case maps:get(MonRef, AT0, undefined) of
         undefined ->
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 0b005dace..4bb5e9f69 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -129,7 +129,8 @@ async_server_call(Server, Caller, Request) ->
 -spec reply(any()) -> any().
 reply(Reply) ->
     {Caller, Ref} = get(rexi_from),
-    erlang:send(Caller, {Ref, Reply, get_delta()}).
+    %%erlang:send(Caller, {Ref, Reply, get_delta()}).
+    erlang:send(Caller, maybe_add_delta({Ref, Reply})).
 
 %% @equiv sync_reply(Reply, 300000)
 sync_reply(Reply) ->
@@ -214,7 +215,8 @@ stream(Msg, Limit, Timeout) ->
         {ok, Count} ->
             put(rexi_unacked, Count + 1),
             {Caller, Ref} = get(rexi_from),
-            erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+            %%erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+            erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})),
             ok
     catch
         throw:timeout ->
@@ -243,7 +245,7 @@ stream2(Msg, Limit, Timeout) ->
         {ok, Count} ->
             put(rexi_unacked, Count + 1),
             {Caller, Ref} = get(rexi_from),
-            erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+            erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})),
             ok
     catch
         throw:timeout ->
@@ -281,7 +283,7 @@ ping() ->
     %% filtered queries will be silent on usage until they finally return
     %% a row or no results. This delay is proportional to the database size,
     %% so instead we make sure ping/0 keeps live stats flowing.
-    erlang:send(Caller, {rexi, '$rexi_ping'}, get_delta()).
+    erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})).
 
 %% internal functions %%
 
@@ -337,3 +339,20 @@ drain_acks(Count) ->
 
 get_delta() ->
     {delta, couch_stats_resource_tracker:make_delta()}.
+
+maybe_add_delta(T) ->
+    case couch_stats_resource_tracker:is_enabled() of
+        false ->
+            T;
+        true ->
+            add_delta(T, get_delta())
+    end.
+
+add_delta({A}, Delta) -> {A, Delta};
+add_delta({A, B}, Delta) -> {A, B, Delta};
+add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
+add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
+add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
+add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
+add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
+add_delta(T, _Delta) -> T.
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index f18ca179f..4924bd11e 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -67,6 +67,8 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
     receive
         {timeout, TimeoutRef} ->
             {timeout, Acc0};
+        {rexi, '$rexi_ping'} ->
+            {ok, Acc0};
         {rexi, '$rexi_ping', {delta, Delta}} ->
             couch_stats_resource_tracker:accumulate_delta(Delta),
             {ok, Acc0};
@@ -120,9 +122,16 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
         %% {#Ref<0.1961702128.3491758082.26965>, {rexi_EXIT,{{query_parse_error,<<78,111,32,114,111,119,115,32,99,97,110,32,109,97
         {Ref, Msg} ->
             %% TODO: add stack trace to log entry
-            couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]),
-            timer:sleep(100),
-            erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg]))));
+            %% couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]),
+            %% timer:sleep(100),
+            %% erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg]))));
+            case lists:keyfind(Ref, Keypos, RefList) of
+                false ->
+                    % this was some non-matching message which we will ignore
+                    {ok, Acc0};
+                Worker ->
+                    Fun(Msg, Worker, Acc0)
+            end;
         {Ref, From, rexi_STREAM_INIT = Msg} ->
             case lists:keyfind(Ref, Keypos, RefList) of
                 false ->
@@ -132,9 +141,15 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
             end;
         {Ref, From, Msg} ->
             %% TODO: add stack trace to log entry
-            couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]),
-            timer:sleep(100),
-            erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, From, Msg]))));
+            %% couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]),
+            %% timer:sleep(100),
+            %% erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, From, Msg]))));
+            case lists:keyfind(Ref, Keypos, RefList) of
+                false ->
+                    {ok, Acc0};
+                Worker ->
+                    Fun(Msg, {Worker, From}, Acc0)
+            end;
         {rexi_DOWN, _, _, _} = Msg ->
             Fun(Msg, nil, Acc0)
     after PerMsgTO ->