You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2024/02/12 23:36:15 UTC
(couchdb) 14/28: Make CSRT toggle-able and rework delta sending
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 4bce276f8cf6060d3f91a71f2c301c3cf0389d84
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Nov 20 14:25:32 2023 -0800
Make CSRT toggle-able and rework delta sending
---
.../src/couch_stats_resource_tracker.erl | 163 +++++++++++++--------
src/rexi/src/rexi.erl | 27 +++-
src/rexi/src/rexi_utils.erl | 27 +++-
3 files changed, 143 insertions(+), 74 deletions(-)
diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl
index 5097f2866..fb0d39cd3 100644
--- a/src/couch_stats/src/couch_stats_resource_tracker.erl
+++ b/src/couch_stats/src/couch_stats_resource_tracker.erl
@@ -34,6 +34,7 @@
-export([
create_context/0, create_context/1, create_context/3,
create_coordinator_context/1, create_coordinator_context/2,
+ is_enabled/0,
get_resource/0,
get_resource/1,
set_context_dbname/1,
@@ -175,11 +176,15 @@
get_kp_node = 0
}).
+is_enabled() ->
+ config:get_boolean("couch_stats_resource_tracker", "enabled", true).
+
db_opened() -> inc(db_opened).
doc_read() -> inc(docs_read).
row_read() -> inc(rows_read).
btree_fold() -> inc(?COUCH_BT_FOLDS).
-ioq_called() -> inc(ioq_calls).
+%% TODO: do we need ioq_called and this access pattern?
+ioq_called() -> is_enabled() andalso inc(ioq_calls).
js_evaled() -> inc(js_evals).
js_filtered() -> inc(js_filter).
js_filtered_error() -> inc(js_filter_error).
@@ -290,29 +295,29 @@ maybe_inc(_Metric, _Val) ->
%% TODO: update stats_descriptions.cfg for relevant apps
should_track([fabric_rpc, all_docs, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, changes, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, changes, processed]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, changes, returned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, map_view, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, reduce_view, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, get_all_security, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, open_doc, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, update_docs, spawned]) ->
- true;
+ is_enabled();
should_track([fabric_rpc, open_shard, spawned]) ->
- true;
+ is_enabled();
should_track([mango_cursor, view, all_docs]) ->
- true;
+ is_enabled();
should_track([mango_cursor, view, idx]) ->
- true;
+ is_enabled();
should_track(_Metric) ->
%%io:format("SKIPPING METRIC: ~p~n", [Metric]),
false.
@@ -320,14 +325,14 @@ should_track(_Metric) ->
accumulate_delta(Delta) ->
%% TODO: switch to creating a batch of updates to invoke a single
%% update_counter rather than sequentially invoking it for each field
- maps:foreach(fun inc/2, Delta).
+ is_enabled() andalso maps:foreach(fun inc/2, Delta).
update_counter(Field, Count) ->
- update_counter(get_pid_ref(), Field, Count).
+ is_enabled() andalso update_counter(get_pid_ref(), Field, Count).
update_counter({_Pid,_Ref}=PidRef, Field, Count) ->
- ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}).
+ is_enabled() andalso ets:update_counter(?MODULE, PidRef, {Field, Count}, #rctx{pid_ref=PidRef}).
active() -> active_int(all).
@@ -622,73 +627,98 @@ get_pid_ref() ->
create_context() ->
- create_context(self()).
+ is_enabled() andalso create_context(self()).
create_context(Pid) ->
- Ref = make_ref(),
- Rctx = make_record(Pid, Ref),
- track(Rctx),
- ets:insert(?MODULE, Rctx),
- Rctx.
+ case is_enabled() of
+ false ->
+ ok;
+ true ->
+ Ref = make_ref(),
+ Rctx = make_record(Pid, Ref),
+ track(Rctx),
+ ets:insert(?MODULE, Rctx),
+ Rctx
+ end.
%% add type to disnguish coordinator vs rpc_worker
create_context(From, {M,F,_A} = MFA, Nonce) ->
- PidRef = get_pid_ref(), %% this will instantiate a new PidRef
- %% TODO: extract user_ctx and db/shard from
- Rctx = #rctx{
- pid_ref = PidRef,
- from = From,
- mfa = MFA,
- type = {worker, M, F},
- nonce = Nonce
- },
- track(Rctx),
- erlang:put(?DELTA_TZ, Rctx),
- true = ets:insert(?MODULE, Rctx),
- Rctx.
+ case is_enabled() of
+ false ->
+ ok;
+ true ->
+ PidRef = get_pid_ref(), %% this will instantiate a new PidRef
+ %% TODO: extract user_ctx and db/shard from
+ Rctx = #rctx{
+ pid_ref = PidRef,
+ from = From,
+ mfa = MFA,
+ type = {worker, M, F},
+ nonce = Nonce
+ },
+ track(Rctx),
+ erlang:put(?DELTA_TZ, Rctx),
+ true = ets:insert(?MODULE, Rctx),
+ Rctx
+ end.
create_coordinator_context(#httpd{path_parts=Parts} = Req) ->
- create_coordinator_context(Req, io_lib:format("~p", [Parts])).
+ is_enabled() andalso create_coordinator_context(Req, io_lib:format("~p", [Parts])).
create_coordinator_context(#httpd{} = Req, Path) ->
- #httpd{
- method = Verb,
- nonce = Nonce
- } = Req,
- PidRef = get_pid_ref(), %% this will instantiate a new PidRef
- Rctx = #rctx{
- pid_ref = PidRef,
- type = {coordinator, Verb, [$/ | Path]},
- nonce = Nonce
- },
- track(Rctx),
- erlang:put(?DELTA_TZ, Rctx),
- true = ets:insert(?MODULE, Rctx),
- Rctx.
+ case is_enabled() of
+ false ->
+ ok;
+ true ->
+ #httpd{
+ method = Verb,
+ nonce = Nonce
+ } = Req,
+ PidRef = get_pid_ref(), %% this will instantiate a new PidRef
+ Rctx = #rctx{
+ pid_ref = PidRef,
+ type = {coordinator, Verb, [$/ | Path]},
+ nonce = Nonce
+ },
+ track(Rctx),
+ erlang:put(?DELTA_TZ, Rctx),
+ true = ets:insert(?MODULE, Rctx),
+ Rctx
+ end.
set_context_dbname(DbName) ->
- case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
+ case is_enabled() of
false ->
- Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
- io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
- timer:sleep(1000),
- erlang:halt(kaboomz);
+ ok;
true ->
- true
+ case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
+ false ->
+ Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
+ io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
+ timer:sleep(1000),
+ erlang:halt(kaboomz);
+ true ->
+ true
+ end
end.
set_context_username(null) ->
ok;
set_context_username(UserName) ->
- case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of
+ case is_enabled() of
false ->
- Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
- io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
- timer:sleep(1000),
- erlang:halt(kaboomz);
+ ok;
true ->
- true
+ case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of
+ false ->
+ Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
+ io:format("UPDATING USERNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
+ timer:sleep(1000),
+ erlang:halt(kaboomz);
+ true ->
+ true
+ end
end.
track(#rctx{}=Rctx) ->
@@ -830,7 +860,12 @@ init([]) ->
{keypos, #rctx.pid_ref}
]),
St = #st{},
- _TimerRef = erlang:send_after(St#st.scan_interval, self(), scan),
+ case is_enabled() of
+ false ->
+ ok;
+ true ->
+ _TimerRef = erlang:send_after(St#st.scan_interval, self(), scan)
+ end,
{ok, St}.
handle_call(fetch, _from, #st{} = St) ->
@@ -852,7 +887,7 @@ handle_info(scan, #st{tracking=AT0} = St0) ->
AT = maybe_track(Unmonitored, AT0),
_TimerRef = erlang:send_after(St0#st.scan_interval, self(), scan),
{noreply, St0#st{tracking=AT}};
-handle_info({'DOWN', MonRef, Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
+handle_info({'DOWN', MonRef, _Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
%% io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]),
St = case maps:get(MonRef, AT0, undefined) of
undefined ->
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 0b005dace..4bb5e9f69 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -129,7 +129,8 @@ async_server_call(Server, Caller, Request) ->
-spec reply(any()) -> any().
reply(Reply) ->
{Caller, Ref} = get(rexi_from),
- erlang:send(Caller, {Ref, Reply, get_delta()}).
+ %%erlang:send(Caller, {Ref, Reply, get_delta()}).
+ erlang:send(Caller, maybe_add_delta({Ref, Reply})).
%% @equiv sync_reply(Reply, 300000)
sync_reply(Reply) ->
@@ -214,7 +215,8 @@ stream(Msg, Limit, Timeout) ->
{ok, Count} ->
put(rexi_unacked, Count + 1),
{Caller, Ref} = get(rexi_from),
- erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+ %%erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+ erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})),
ok
catch
throw:timeout ->
@@ -243,7 +245,7 @@ stream2(Msg, Limit, Timeout) ->
{ok, Count} ->
put(rexi_unacked, Count + 1),
{Caller, Ref} = get(rexi_from),
- erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
+ erlang:send(Caller, maybe_add_delta({Ref, self(), Msg})),
ok
catch
throw:timeout ->
@@ -281,7 +283,7 @@ ping() ->
%% filtered queries will be silent on usage until they finally return
%% a row or no results. This delay is proportional to the database size,
%% so instead we make sure ping/0 keeps live stats flowing.
- erlang:send(Caller, {rexi, '$rexi_ping'}, get_delta()).
+ erlang:send(Caller, maybe_add_delta({rexi, '$rexi_ping'})).
%% internal functions %%
@@ -337,3 +339,20 @@ drain_acks(Count) ->
get_delta() ->
{delta, couch_stats_resource_tracker:make_delta()}.
+
+maybe_add_delta(T) ->
+ case couch_stats_resource_tracker:is_enabled() of
+ false ->
+ T;
+ true ->
+ add_delta(T, get_delta())
+ end.
+
+add_delta({A}, Delta) -> {A, Delta};
+add_delta({A, B}, Delta) -> {A, B, Delta};
+add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
+add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
+add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
+add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
+add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
+add_delta(T, _Delta) -> T.
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index f18ca179f..4924bd11e 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -67,6 +67,8 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
receive
{timeout, TimeoutRef} ->
{timeout, Acc0};
+ {rexi, '$rexi_ping'} ->
+ {ok, Acc0};
{rexi, '$rexi_ping', {delta, Delta}} ->
couch_stats_resource_tracker:accumulate_delta(Delta),
{ok, Acc0};
@@ -120,9 +122,16 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
%% {#Ref<0.1961702128.3491758082.26965>, {rexi_EXIT,{{query_parse_error,<<78,111,32,114,111,119,115,32,99,97,110,32,109,97
{Ref, Msg} ->
%% TODO: add stack trace to log entry
- couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]),
- timer:sleep(100),
- erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg]))));
+ %% couch_log:debug("rexi_utils:process_message no delta: {Ref, Msg} => {~p, ~p}~n", [Ref, Msg]),
+ %% timer:sleep(100),
+ %% erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, Msg} => {~w, ~w}~n", [Ref, Msg]))));
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
{Ref, From, rexi_STREAM_INIT = Msg} ->
case lists:keyfind(Ref, Keypos, RefList) of
false ->
@@ -132,9 +141,15 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
end;
{Ref, From, Msg} ->
%% TODO: add stack trace to log entry
- couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]),
- timer:sleep(100),
- erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, From, Msg]))));
+ %% couch_log:debug("rexi_utils:process_message no delta: {Ref, From, Msg} => {~p, ~p, ~p}~n", [Ref, From, Msg]),
+ %% timer:sleep(100),
+ %% erlang:halt(binary_to_list(iolist_to_binary(io_lib:format("{enodelta} rexi_utils:process_message no delta: {Ref, From, Msg} => {~w, ~w, ~w}~n", [Ref, From, Msg]))));
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
{rexi_DOWN, _, _, _} = Msg ->
Fun(Msg, nil, Acc0)
after PerMsgTO ->