You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2024/02/12 23:36:05 UTC

(couchdb) 04/28: Embed worker usage deltas in rexi:ping

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch couch-stats-resource-tracker-rebase
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d8f475fb9138007182ce3eda252c06774b629636
Author: Russell Branca <ch...@apache.org>
AuthorDate: Tue Oct 31 15:13:45 2023 -0700

    Embed worker usage deltas in rexi:ping
---
 src/rexi/src/rexi.erl       | 6 ++++++
 src/rexi/src/rexi_utils.erl | 6 ++++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 754dee444..0db6ac5de 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -265,16 +265,22 @@ stream_last(Msg, Timeout) ->
 
 %% @equiv stream_ack(Client, 1)
 stream_ack(Client) ->
+    %% stream_ack is coordinator side only, no need to send worker deltas
     erlang:send(Client, {rexi_ack, 1}).
 
 %% @doc Ack streamed messages
 stream_ack(Client, N) ->
+    %% stream_ack is coordinator side only, no need to send worker deltas
     erlang:send(Client, {rexi_ack, N}).
 
 %% Sends a ping message to the coordinator. This is for long running
 %% operations on a node that could exceed the rexi timeout
 ping() ->
     {Caller, _} = get(rexi_from),
+    %% It is essential ping/0 includes deltas as otherwise long running
+    %% filtered queries will be silent on usage until they finally return
+    %% a row or no results. This delay is proportional to the database size,
+    %% so instead we make sure ping/0 keeps live stats flowing.
     erlang:send(Caller, {rexi, '$rexi_ping'}).
 
 %% internal functions %%
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index f7127ecd1..3125f75db 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -67,6 +67,10 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
     receive
         {timeout, TimeoutRef} ->
             {timeout, Acc0};
+        {rexi, '$rexi_ping', {delta, Delta}} ->
+            io:format("[~p]GOT PING DELTA: ~p~n", [self(), Delta]),
+            couch_stats_resource_tracker:accumulate_delta(Delta),
+            {ok, Acc0};
         {rexi, Ref, Msg} ->
             case lists:keyfind(Ref, Keypos, RefList) of
                 false ->
@@ -81,8 +85,6 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
                 Worker ->
                     Fun(Msg, {Worker, From}, Acc0)
             end;
-        {rexi, '$rexi_ping'} ->
-            {ok, Acc0};
         {Ref, Msg, {delta, Delta}} ->
             io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
             couch_stats_resource_tracker:accumulate_delta(Delta),