You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2024/02/12 23:36:05 UTC
(couchdb) 04/28: Embed worker usage deltas in rexi:ping
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d8f475fb9138007182ce3eda252c06774b629636
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Oct 31 15:13:45 2023 -0700
Embed worker usage deltas in rexi:ping
---
src/rexi/src/rexi.erl | 6 ++++++
src/rexi/src/rexi_utils.erl | 6 ++++--
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 754dee444..0db6ac5de 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -265,16 +265,22 @@ stream_last(Msg, Timeout) ->
%% @equiv stream_ack(Client, 1)
stream_ack(Client) ->
+ %% stream_ack is coordinator side only, no need to send worker deltas
erlang:send(Client, {rexi_ack, 1}).
%% @doc Ack streamed messages
stream_ack(Client, N) ->
+ %% stream_ack is coordinator side only, no need to send worker deltas
erlang:send(Client, {rexi_ack, N}).
%% Sends a ping message to the coordinator. This is for long running
%% operations on a node that could exceed the rexi timeout
ping() ->
{Caller, _} = get(rexi_from),
+ %% It is essential ping/0 includes deltas as otherwise long running
+ %% filtered queries will be silent on usage until they finally return
+ %% a row or no results. This delay is proportional to the database size,
+ %% so instead we make sure ping/0 keeps live stats flowing.
erlang:send(Caller, {rexi, '$rexi_ping'}).
%% internal functions %%
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index f7127ecd1..3125f75db 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -67,6 +67,10 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
receive
{timeout, TimeoutRef} ->
{timeout, Acc0};
+ {rexi, '$rexi_ping', {delta, Delta}} ->
+ io:format("[~p]GOT PING DELTA: ~p~n", [self(), Delta]),
+ couch_stats_resource_tracker:accumulate_delta(Delta),
+ {ok, Acc0};
{rexi, Ref, Msg} ->
case lists:keyfind(Ref, Keypos, RefList) of
false ->
@@ -81,8 +85,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
Worker ->
Fun(Msg, {Worker, From}, Acc0)
end;
- {rexi, '$rexi_ping'} ->
- {ok, Acc0};
{Ref, Msg, {delta, Delta}} ->
io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
couch_stats_resource_tracker:accumulate_delta(Delta),