You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2023/10/17 21:47:51 UTC

[couchdb] 01/01: PoC couch_stats local resource usage tracking

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch couch-stats-resource-tracker
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit be8427986dc67ec70b5916db186f2fb0dcdcacbf
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Jul 18 16:26:00 2022 -0700

    PoC couch_stats local resource usage tracking
---
 src/chttpd/src/chttpd_httpd_handlers.erl           |   1 +
 src/chttpd/src/chttpd_misc.erl                     |  15 +
 src/couch/priv/stats_descriptions.cfg              |   5 +
 src/couch/src/couch_btree.erl                      |   3 +
 src/couch/src/couch_db.erl                         |   2 +
 src/couch/src/couch_query_servers.erl              |   8 +
 src/couch/src/couch_server.erl                     |   2 +
 src/couch_stats/src/couch_stats.erl                |  30 +
 .../src/couch_stats_resource_tracker.erl           | 607 +++++++++++++++++++++
 src/couch_stats/src/couch_stats_sup.erl            |   1 +
 src/fabric/priv/stats_descriptions.cfg             |  26 +
 src/fabric/src/fabric_rpc.erl                      |   6 +
 src/fabric/src/fabric_util.erl                     |   9 +-
 src/mango/src/mango_cursor_view.erl                |   2 +
 src/mango/src/mango_selector.erl                   |   1 +
 src/rexi/src/rexi.erl                              |   6 +-
 src/rexi/src/rexi_server.erl                       |   5 +-
 src/rexi/src/rexi_utils.erl                        |  21 +
 18 files changed, 745 insertions(+), 5 deletions(-)

diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index 932b52e5f..e1b260222 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1;
 url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1;
 url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1;
 url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 9df3a881f..195c8d828 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -20,6 +20,7 @@
     handle_replicate_req/1,
     handle_reload_query_servers_req/1,
     handle_task_status_req/1,
+    handle_resource_status_req/1,
     handle_up_req/1,
     handle_utils_dir_req/1,
     handle_utils_dir_req/2,
@@ -230,6 +231,20 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) ->
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
+handle_resource_status_req(#httpd{method = 'GET'} = Req) ->
+    ok = chttpd:verify_is_server_admin(Req),
+    {Resp, Bad} = rpc:multicall(erlang, apply, [
+        fun() ->
+            {node(), couch_stats_resource_tracker:active()}
+        end,
+        []
+    ]),
+    %% TODO: incorporate Bad responses
+    io:format("ACTIVE RESP: ~p~nBAD RESP: ~p~n", [Resp, Bad]),
+    send_json(Req, {Resp});
+handle_resource_status_req(Req) ->
+    send_method_not_allowed(Req, "GET,HEAD").
+
 handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = PostBody} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index 2d40518e2..e3f6a585e 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -414,6 +414,11 @@
     {type, counter},
     {desc, <<"number of legacy checksums found in couch_file instances">>}
 ]}.
+%% CSRT (couch_stats_resource_tracker) stats
+{[couchdb, csrt, delta_missing_t0], [
+    {type, counter},
+    {desc, <<"number of csrt contexts without a proper startime">>}
+]}.
 {[pread, exceed_eof], [
     {type, counter},
     {desc, <<"number of the attempts to read beyond end of db file">>}
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index b974a22ee..168171320 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
 
 get_node(#btree{fd = Fd}, NodePos) ->
     {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+    %% TODO: wire in csrt tracking
+    couch_stats:increment_counter([couchdb, btree, get_node]),
     {NodeType, NodeList}.
 
 write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
@@ -1163,6 +1165,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) -
         false ->
             {stop, {PrevKVs, Reds}, Acc};
         true ->
+            couch_stats:increment_counter([couchdb, btree, changes_processed]),
             AssembledKV = assemble(Bt, K, V),
             case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
                 {ok, Acc2} ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 2ef89ced3..c7afaa4b3 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -297,6 +297,7 @@ open_doc(Db, IdOrDocInfo) ->
     open_doc(Db, IdOrDocInfo, []).
 
 open_doc(Db, Id, Options) ->
+    %% TODO: wire in csrt tracking
     increment_stat(Db, [couchdb, database_reads]),
     case open_doc_int(Db, Id, Options) of
         {ok, #doc{deleted = true} = Doc} ->
@@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when
 ->
     case lists:member(sys_db, Options) of
         true ->
+            %% TODO: we shouldn't leak resource usage just because it's a sys_db
             ok;
         false ->
             couch_stats:increment_counter(Stat, Count)
diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl
index 151bdc805..3d759594f 100644
--- a/src/couch/src/couch_query_servers.erl
+++ b/src/couch/src/couch_query_servers.erl
@@ -542,6 +542,8 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
         {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)}
     catch
         throw:{os_process_error, {exit_status, 1}} ->
+            %% TODO: wire in csrt tracking
+            couch_stats:increment_counter([couchdb, query_server, js_filter_error]),
             %% batch used too much memory, retry sequentially.
             Fun = fun(JsonDoc) ->
                 filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc])
@@ -550,6 +552,12 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
     end.
 
 filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) ->
+    %% Count usage in _int version as this can be repeated for OS error
+    %% Pros & cons... might not have actually processed `length(JsonDocs)` docs
+    %% but it certainly undercounts if we count in `filter_docs/5` above
+    %% TODO: wire in csrt tracking
+    couch_stats:increment_counter([couchdb, query_server, js_filter]),
+    couch_stats:increment_counter([couchdb, query_server, js_filtered_docs], length(JsonDocs)),
     [true, Passes] = ddoc_prompt(
         Db,
         DDoc,
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 7dbbe4af1..c6c244ad4 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -97,6 +97,8 @@ sup_start_link(N) ->
     gen_server:start_link({local, couch_server(N)}, couch_server, [N], []).
 
 open(DbName, Options) ->
+    %% TODO: wire in csrt tracking
+    couch_stats:increment_counter([couchdb, couch_server, open]),
     try
         validate_open_or_create(DbName, Options),
         open_int(DbName, Options)
diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl
index 29a402449..29190e6b0 100644
--- a/src/couch_stats/src/couch_stats.erl
+++ b/src/couch_stats/src/couch_stats.erl
@@ -24,6 +24,12 @@
     update_gauge/2
 ]).
 
+%% couch_stats_resource_tracker API
+-export([
+    create_context/3,
+    maybe_track_rexi_init_p/1
+]).
+
 -type response() :: ok | {error, unknown_metric} | {error, invalid_metric}.
 -type stat() :: {any(), [{atom(), any()}]}.
 
@@ -49,6 +55,11 @@ increment_counter(Name) ->
 
 -spec increment_counter(any(), pos_integer()) -> response().
 increment_counter(Name, Value) ->
+    %% Should maybe_track_local happen before or after notify?
+    %% If after, only currently tracked metrics declared in the app's
+    %% stats_description.cfg will be trackable locally. Pros/cons.
+    %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]),
+    ok = maybe_track_local_counter(Name, Value),
     case couch_stats_util:get_counter(Name, stats()) of
         {ok, Ctx} -> couch_stats_counter:increment(Ctx, Value);
         {error, Error} -> {error, Error}
@@ -100,6 +111,25 @@ stats() ->
 now_sec() ->
     erlang:monotonic_time(second).
 
+%% Only potentially track positive increments to counters
+-spec maybe_track_local_counter(any(), any()) -> ok.
+maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 ->
+    %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]),
+    couch_stats_resource_tracker:maybe_inc(Name, Val),
+    ok;
+maybe_track_local_counter(_, _) ->
+    ok.
+
+create_context(From, MFA, Nonce) ->
+    couch_stats_resource_tracker:create_context(From, MFA, Nonce).
+
+maybe_track_rexi_init_p({M, F, _A}) ->
+    Metric = [M, F, spawned],
+    case couch_stats_resource_tracker:should_track(Metric) of
+        true -> increment_counter(Metric);
+        false -> ok
+    end.
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl
new file mode 100644
index 000000000..00e375e8c
--- /dev/null
+++ b/src/couch_stats/src/couch_stats_resource_tracker.erl
@@ -0,0 +1,607 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stats_resource_tracker).
+
+-behaviour(gen_server).
+
+-export([
+    start_link/0,
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3,
+    terminate/2
+]).
+
+-export([
+    inc/1, inc/2,
+    maybe_inc/2,
+    get_pid_ref/0,
+    accumulate_delta/1
+]).
+
+-export([
+    create_context/0, create_context/1, create_context/3,
+    track/1,
+    should_track/1
+]).
+
+-export([
+    active/0
+]).
+
+-export([
+    make_delta/0
+]).
+
+%% Singular increment operations
+-export([
+    db_opened/0,
+    doc_read/0,
+    row_read/0,
+    change_processed/0,
+    ioq_called/0,
+    js_evaled/0,
+    js_filtered/0,
+    js_filtered_error/0,
+    js_filtered_doc/0,
+    mango_match_evaled/0,
+    get_kv_node/0,
+    get_kp_node/0
+]).
+
+%% Plural increment operations
+-export([
+    js_filtered_docs/1,
+    io_bytes_read/1,
+    io_bytes_written/1
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+%% Use these for record upgrades over the wire and in ETS tables
+%% TODO: alternatively, just delete these. Currently using a map
+%% for shipping deltas over the wire, avoiding much of the
+%% problem here. We'll likely still need to handle upgrades to
+%% map format over time, so let's decide a course of action here.
+-define(RCTX_V1, rctx_v1).
+-define(RCTX, ?RCTX_V1).
+
+-define(MANGO_EVAL_MATCH, mango_eval_match).
+-define(DB_OPEN_DOC, docs_read).
+
+-define(FRPC_CHANGES_ROW, changes_processed).
+
+%% Module pdict markers
+-define(DELTA_TA, csrt_delta_ta).
+-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0
+-define(PID_REF, csrt_pid_ref). %% track local ID
+
+
+-record(st, {
+    eviction_delay = 10000, %% How many ms dead processes are visible
+    tracking = #{} %% track active processes for eventual eviction
+}).
+
+
+%% TODO: switch to:
+%% -record(?RCTX, {
+-record(rctx, {
+    updated_at = os:timestamp(),
+    exited_at,
+    pid_ref,
+    mfa,
+    nonce,
+    from,
+    type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
+    db_open = 0,
+    docs_read = 0,
+    rows_read = 0,
+    changes_processed = 0,
+    ioq_calls = 0,
+    io_bytes_read = 0,
+    io_bytes_written = 0,
+    js_evals = 0,
+    js_filter = 0,
+    js_filter_error = 0,
+    js_filtered_docs = 0,
+    mango_eval_match = 0,
+    get_kv_node = 0,
+    get_kp_node = 0,
+    state = alive
+}).
+
+db_opened() -> inc(db_open).
+doc_read() -> inc(docs_read).
+row_read() -> inc(rows_read).
+change_processed() -> inc(changes_processed).
+ioq_called() -> inc(ioq_calls).
+js_evaled() -> inc(js_evals).
+js_filtered() -> inc(js_filter).
+js_filtered_error() -> inc(js_filter_error).
+js_filtered_doc() -> inc(js_filtered_docs).
+mango_match_evaled() -> inc(mango_eval_match).
+get_kv_node() -> inc(get_kv_node).
+get_kp_node() -> inc(get_kp_node).
+
+js_filtered_docs(N) -> inc(js_filtered_docs, N).
+io_bytes_read(N) -> inc(io_bytes_read, N).
+io_bytes_written(N) -> inc(io_bytes_written, N).
+
+inc(db_open) ->
+    inc(db_open, 1);
+inc(docs_read) ->
+    inc(docs_read, 1);
+inc(rows_read) ->
+    inc(rows_read, 1);
+inc(changes_processed) ->
+    inc(changes_processed, 1);
+inc(ioq_calls) ->
+    inc(ioq_calls, 1);
+inc(io_bytes_read) ->
+    inc(io_bytes_read, 1);
+inc(io_bytes_written) ->
+    inc(io_bytes_written, 1);
+inc(js_evals) ->
+    inc(js_evals, 1);
+inc(js_filter) ->
+    inc(js_filter, 1);
+inc(js_filter_error) ->
+    inc(js_filter_error, 1);
+inc(js_filtered_docs) ->
+    inc(js_filtered_docs, 1);
+inc(?MANGO_EVAL_MATCH) ->
+    inc(?MANGO_EVAL_MATCH, 1);
+inc(get_kv_node) ->
+    inc(get_kv_node, 1);
+inc(get_kp_node) ->
+    inc(get_kp_node, 1);
+inc(_) ->
+    0.
+
+
+inc(db_open, N) ->
+    update_counter(#rctx.db_open, N);
+inc(rows_read, N) ->
+    update_counter(#rctx.rows_read, N);
+inc(ioq_calls, N) ->
+    update_counter(#rctx.ioq_calls, N);
+inc(io_bytes_read, N) ->
+    update_counter(#rctx.io_bytes_read, N);
+inc(io_bytes_written, N) ->
+    update_counter(#rctx.io_bytes_written, N);
+inc(js_evals, N) ->
+    update_counter(#rctx.js_evals, N);
+inc(js_filter, N) ->
+    update_counter(#rctx.js_filter, N);
+inc(js_filter_error, N) ->
+    update_counter(#rctx.js_filter_error, N);
+inc(js_filtered_docs, N) ->
+    update_counter(#rctx.js_filtered_docs, N);
+inc(?MANGO_EVAL_MATCH, N) ->
+    update_counter(#rctx.?MANGO_EVAL_MATCH, N);
+inc(?DB_OPEN_DOC, N) ->
+    update_counter(#rctx.?DB_OPEN_DOC, N);
+inc(?FRPC_CHANGES_ROW, N) ->
+    update_counter(#rctx.?FRPC_CHANGES_ROW, N);
+inc(get_kv_node, N) ->
+    update_counter(#rctx.get_kv_node, N);
+inc(get_kp_node, N) ->
+    update_counter(#rctx.get_kp_node, N);
+inc(_, _) ->
+    0.
+
+maybe_inc([mango, evaluate_selector], Val) ->
+    inc(?MANGO_EVAL_MATCH, Val);
+maybe_inc([couchdb, database_reads], Val) ->
+    inc(?DB_OPEN_DOC, Val);
+maybe_inc([fabric_rpc, changes, rows_read], Val) ->
+    inc(?FRPC_CHANGES_ROW, Val);
+maybe_inc(_, _) ->
+    0.
+
+
+%% TODO: update stats_descriptions.cfg for relevant apps
+should_track([fabric_rpc, all_docs, spawned]) ->
+    true;
+should_track([fabric_rpc, changes, spawned]) ->
+    true;
+should_track([fabric_rpc, map_view, spawned]) ->
+    true;
+should_track([fabric_rpc, reduce_view, spawned]) ->
+    true;
+should_track([fabric_rpc, get_all_security, spawned]) ->
+    true;
+should_track([fabric_rpc, open_doc, spawned]) ->
+    true;
+should_track([fabric_rpc, update_docs, spawned]) ->
+    true;
+should_track([fabric_rpc, open_shard, spawned]) ->
+    true;
+should_track([mango_cursor, view, all_docs]) ->
+    true;
+should_track([mango_cursor, view, idx]) ->
+    true;
+should_track(_) ->
+    false.
+
+%% TODO: update coordinator stats from worker deltas
+accumulate_delta(_Delta) ->
+    ok.
+
+update_counter(Field, Count) ->
+    update_counter(get_pid_ref(), Field, Count).
+
+
+update_counter({_Pid,_Ref}=Key, Field, Count) ->
+    ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}).
+
+
+active() ->
+    lists:map(fun to_json/1, ets:tab2list(?MODULE)).
+
+
+to_json(#rctx{}=Rctx) ->
+    #rctx{
+        updated_at = TP,
+        pid_ref = {_Pid, _Ref} = PidRef,
+        mfa = MFA0,
+        nonce = Nonce0,
+        from = From0,
+        docs_read = DocsRead,
+        rows_read = RowsRead,
+        state = State0,
+        type = Type,
+        changes_processed = ChangesProcessed
+    } = Rctx,
+    %%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
+    MFA = case MFA0 of
+        {M, F, A} ->
+            [M, F, A];
+        undefined ->
+            null;
+        Other ->
+            throw({error, {unexpected, Other}})
+    end,
+    From = case From0 of
+        {Parent, ParentRef} ->
+            [pid_to_list(Parent), ref_to_list(ParentRef)];
+        undefined ->
+            null
+    end,
+    State = case State0 of
+        alive ->
+            alive;
+        {down, Reason} when is_atom(Reason) ->
+            [down, Reason];
+        Unknown ->
+            [unknown, io_lib:format("~w", [Unknown])]
+    end,
+    Nonce = case Nonce0 of
+        undefined ->
+            null;
+        Nonce0 ->
+            list_to_binary(Nonce0)
+    end,
+    #{
+        updated_at => term_to_json(TP),
+        %%pid_ref => [pid_to_list(Pid), ref_to_list(Ref)],
+        pid_ref => term_to_json(PidRef),
+        mfa => term_to_json(MFA),
+        nonce => term_to_json(Nonce),
+        %%from => From,
+        from => term_to_json(From),
+        docs_read => DocsRead,
+        rows_read => RowsRead,
+        state => State,
+        type => term_to_json(Type),
+        changes_processed => ChangesProcessed
+    }.
+
+term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) ->
+    [?l2b(pid_to_list(Pid)), ?l2b(ref_to_list(Ref))];
+term_to_json({A, B, C}) ->
+    [A, B, C];
+term_to_json(undefined) ->
+    null;
+term_to_json(T) ->
+    T.
+
+term_to_flat_json(Tuple) when is_tuple(Tuple) ->
+    ?l2b(io_lib:format("~w", [Tuple]));
+term_to_flat_json(undefined) ->
+    null;
+term_to_flat_json(T) ->
+    T.
+
+to_flat_json(#rctx{}=Rctx) ->
+    #rctx{
+        updated_at = TP,
+        pid_ref = {_Pid, _Ref} = PidRef,
+        mfa = MFA0,
+        nonce = Nonce0,
+        from = From0,
+        docs_read = DocsRead,
+        rows_read = RowsRead,
+        state = State0,
+        type = Type,
+        changes_processed = ChangesProcessed
+    } = Rctx,
+    io:format("TO_JSON_MFA: ~p~n", [MFA0]),
+    MFA = case MFA0 of
+        {_M, _F, _A} ->
+            ?l2b(io_lib:format("~w", [MFA0]));
+        undefined ->
+            null;
+        Other ->
+            throw({error, {unexpected, Other}})
+    end,
+    From = case From0 of
+        {_Parent, _ParentRef} ->
+            ?l2b(io_lib:format("~w", [From0]));
+        undefined ->
+            null
+    end,
+    State = case State0 of
+        alive ->
+            alive;
+        State0 ->
+            ?l2b(io_lib:format("~w", [State0]))
+    end,
+    Nonce = case Nonce0 of
+        undefined ->
+            null;
+        Nonce0 ->
+            list_to_binary(Nonce0)
+    end,
+    #{
+        %%updated_at => ?l2b(io_lib:format("~w", [TP])),
+        updated_at => term_to_flat_json(TP),
+        %%pid_ref => [pid_to_list(Pid), ref_to_list(Ref)],
+        pid_ref => ?l2b(io_lib:format("~w", [PidRef])),
+        mfa => MFA,
+        nonce => Nonce,
+        from => From,
+        docs_read => DocsRead,
+        rows_read => RowsRead,
+        state => State,
+        type => term_to_flat_json(Type),
+        changes_processed => ChangesProcessed
+    }.
+
+get_pid_ref() ->
+    case get(?PID_REF) of
+        undefined ->
+            Ref = make_ref(),
+            set_pid_ref({self(), Ref});
+        PidRef ->
+            PidRef
+    end.
+
+
+create_context() ->
+    create_context(self()).
+
+
+create_context(Pid) ->
+    Ref = make_ref(),
+    Rctx = make_record(Pid, Ref),
+    track(Rctx),
+    ets:insert(?MODULE, Rctx),
+    Rctx.
+
+%% add type to disnguish coordinator vs rpc_worker
+create_context(From, {M,F,_A} = MFA, Nonce) ->
+    io:format("CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [From, MFA, Nonce]),
+    Ref = make_ref(),
+    %%Rctx = make_record(self(), Ref),
+    %% TODO: extract user_ctx and db/shard from 
+    Rctx = #rctx{
+        pid_ref = {self(), Ref},
+        from = From,
+        mfa = MFA,
+        type = {worker, M, F},
+        nonce = Nonce
+    },
+    track(Rctx),
+    erlang:put(?DELTA_TZ, Rctx),
+    ets:insert(?MODULE, Rctx),
+    Rctx.
+
+track(#rctx{}=Rctx) ->
+    %% TODO: should this block or not? If no, what cleans up zombies?
+    %% gen_server:call(?MODULE, {track, PR}).
+    gen_server:cast(?MODULE, {track, Rctx}).
+
+
+make_delta() ->
+    TA = case get(?DELTA_TA) of
+        undefined ->
+            %% Need to handle this better, can't just make a new T0 at T' as
+            %% the timestamps will be identical causing a divide by zero error.
+            %%
+            %% Realistically need to ensure that all invocations of database
+            %% operations sets T0 appropriately. Perhaps it's possible to do
+            %% this is the couch_db:open chain, and then similarly, in
+            %% couch_server, and uhhhh... couch_file, and...
+            %%
+            %% I think we need some type of approach for establishing a T0 that
+            %% doesn't result in outrageous deltas. For now zero out the
+            %% microseconds field, or subtract a second on the off chance that
+            %% microseconds is zero. I'm not uptodate on the latest Erlang time
+            %% libraries and don't remember how to easily get an
+            %% `os:timestamp()` out of now() - 100ms or some such.
+            %%
+            %% I think it's unavoidable that we'll have some codepaths that do
+            %% not properly instantiate the T0 at spawn resulting in needing to
+            %% do some time of "time warp" or ignoring the timing collection
+            %% entirely. Perhaps if we hoisted out the stats collection into
+            %% the primary flow of the database and funnel that through all the
+            %% function clauses we could then utilize Dialyzer to statically
+            %% analyze and assert all code paths that invoke database
+            %% operations have properly instantinated a T0 at the appropriate
+            %% start time such that we don't have to "fudge" deltas with a
+            %% missing start point, but we're a long ways from that happening
+            %% so I feel it necessary to address the NULL start time.
+
+            %% Track how often we fail to initiate T0 correctly
+            %% Perhaps somewhat naughty we're incrementing stats from within
+            %% couch_stats itself? Might need to handle this differently
+            %% TODO: determine appropriate course of action here
+            couch_stats:increment_counter(
+                [couchdb, csrt, delta_missing_t0]),
+                %%[couch_stats_resource_tracker, delta_missing_t0]),
+
+            case erlang:get(?DELTA_TZ) of
+                undefined ->
+                    TA0 = make_delta_base(),
+                    %% TODO: handline missing deltas, otherwise divide by zero
+                    set_delta_a(TA0),
+                    TA0;
+                TA0 ->
+                    TA0
+            end;
+        %%?RCTX{} = TA0 ->
+        #rctx{} = TA0 ->
+            TA0
+    end,
+    TB = get_resource(),
+    make_delta(TA, TB).
+
+
+make_delta(#rctx{}=TA, #rctx{}=TB) ->
+    Delta = #{
+        docs_read => TB#rctx.docs_read - TA#rctx.docs_read,
+        rows_read => TB#rctx.rows_read - TA#rctx.rows_read,
+        changes_processed => TB#rctx.changes_processed - TA#rctx.changes_processed,
+        dt => timer:now_diff(TB#rctx.updated_at, TA#rctx.updated_at)
+    },
+    %% TODO: reevaluate this decision
+    %% Only return non zero (and also positive) delta fields
+    maps:filter(fun(_K,V) -> V > 0 end, Delta);
+make_delta(_, #rctx{}) ->
+    #{error => missing_beg_rctx};
+make_delta(#rctx{}, _) ->
+    #{error => missing_fin_rctx}.
+
+make_delta_base() ->
+    Ref = make_ref(),
+    %%Rctx = make_record(self(), Ref),
+    %% TODO: extract user_ctx and db/shard from request
+    TA0 = #rctx{
+        pid_ref = {self(), Ref}
+    },
+    case TA0#rctx.updated_at of
+        {Me, S, Mi} when Mi > 0 ->
+            TA0#rctx{updated_at = {Me, S, 0}};
+        {Me, S, Mi} when S > 0 ->
+            TA0#rctx{updated_at = {Me, S - 1, Mi}}
+    end.
+
+set_delta_a(TA) ->
+    erlang:put(?DELTA_TA, TA).
+
+set_pid_ref(PidRef) ->
+    erlang:put(?PID_REF, PidRef),
+    PidRef.
+
+get_resource() ->
+    get_resource(get_pid_ref()).
+
+get_resource(PidRef) ->
+    case ets:lookup(?MODULE, PidRef) of
+        [#rctx{}=TP] ->
+            TP;
+        [] ->
+            undefined
+    end.
+
+make_record(Pid, Ref) ->
+    #rctx{pid_ref = {Pid, Ref}}.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+    ets:new(?MODULE, [
+        named_table,
+        public,
+        {decentralized_counters, true}, %% TODO: test impact of this
+        {write_concurrency, true},
+        {read_concurrency, true},
+        {keypos, #rctx.pid_ref}
+    ]),
+    {ok, #st{}}.
+
+handle_call(fetch, _from, #st{} = St) ->
+    {reply, {ok, St}, St};
+handle_call({track, _}, _From, St) ->
+    {reply, ok, St};
+handle_call(Msg, _From, St) ->
+    {stop, {unknown_call, Msg}, error, St}.
+
+handle_cast({track, #rctx{pid_ref={Pid,_}=PidRef}}, #st{tracking=AT0} = St0) ->
+    St = case maps:is_key(PidRef, AT0) of
+        true -> %% noop, we're already tracking this PidRef
+            St0;
+        false -> %% setup new monitor and double bookkeep refs
+            Mon = erlang:monitor(process, Pid),
+            AT = maps:put(Mon, PidRef, maps:put(PidRef, Mon, AT0)),
+            St0#st{tracking=AT}
+    end,
+    {noreply, St};
+handle_cast(Msg, St) ->
+    {stop, {unknown_cast, Msg}, St}.
+
+handle_info({'DOWN', MonRef, Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
+    io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]),
+    St = case maps:get(MonRef, AT0, undefined) of
+        undefined ->
+            io:format("ERROR: UNEXPECTED MISSING MONITOR IN TRACKING TABLE: {~p, ~p}~n", [MonRef, DPid]),
+            St0;
+        {RPid, _Ref} = PidRef ->
+            if
+                RPid =:= DPid -> ok;
+                true -> erlang:halt(io_lib:format("CSRT:HI PID MISMATCH ABORT: ~p =/= ~p~n", [DPid, RPid]))
+            end,
+            %% remove double bookkeeping
+            AT = maps:remove(MonRef, maps:remove(PidRef, AT0)),
+            %% TODO: Assert Pid matches Object
+            %% update process state in live table
+            %% TODO: decide whether we want the true match to crash this process on failure
+            true = ets:update_element(?MODULE, PidRef,
+                [{#rctx.state, {down, Reason}}, {#rctx.updated_at, os:timestamp()}]),
+            log_process_lifetime_report(PidRef),
+            %% Delay eviction to allow human visibility on short lived pids
+            erlang:send_after(St0#st.eviction_delay, self(), {evict, PidRef}),
+            St0#st{tracking=AT}
+    end,
+    {noreply, St};
+handle_info({evict, {_Pid, _Ref}=PidRef}, #st{}=St) ->
+    ets:delete(?MODULE, PidRef),
+    {noreply, St};
+handle_info(Msg, St) ->
+    {stop, {unknown_info, Msg}, St}.
+
+terminate(_Reason, _St) ->
+    ok.
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+log_process_lifetime_report(PidRef) ->
+    %% More safely assert this can't ever be undefined
+    #rctx{} = Rctx = get_resource(PidRef),
+    %% TODO: catch error out of here, report crashes on depth>1 json
+    couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)).
diff --git a/src/couch_stats/src/couch_stats_sup.erl b/src/couch_stats/src/couch_stats_sup.erl
index 325372c3e..4b4df17e2 100644
--- a/src/couch_stats/src/couch_stats_sup.erl
+++ b/src/couch_stats/src/couch_stats_sup.erl
@@ -29,6 +29,7 @@ init([]) ->
         {
             {one_for_one, 5, 10}, [
                 ?CHILD(couch_stats_server, worker),
+                ?CHILD(couch_stats_resource_tracker, worker),
                 ?CHILD(couch_stats_process_tracker, worker)
             ]
         }}.
diff --git a/src/fabric/priv/stats_descriptions.cfg b/src/fabric/priv/stats_descriptions.cfg
index d12aa0c84..57ec0b8d2 100644
--- a/src/fabric/priv/stats_descriptions.cfg
+++ b/src/fabric/priv/stats_descriptions.cfg
@@ -26,3 +26,29 @@
     {type, counter},
     {desc, <<"number of write quorum errors">>}
 ]}.
+
+
+%% fabric_rpc worker stats
+%% TODO: decide on which naming scheme:
+%% {[fabric_rpc, get_all_security, spawned], [
+%% {[fabric_rpc, spawned, get_all_security], [
+{[fabric_rpc, get_all_security, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker get_all_security spawns">>}
+]}.
+{[fabric_rpc, open_doc, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker open_doc spawns">>}
+]}.
+{[fabric_rpc, all_docs, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker all_docs spawns">>}
+]}.
+{[fabric_rpc, open_shard, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker open_shard spawns">>}
+]}.
+{[fabric_rpc, changes, spawned], [
+    {type, counter},
+    {desc, <<"number of fabric_rpc worker changes spawns">>}
+]}.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index d01f1f5a7..b5064b5af 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -492,6 +492,11 @@ view_cb({meta, Meta}, Acc) ->
     ok = rexi:stream2({meta, Meta}),
     {ok, Acc};
 view_cb({row, Row}, Acc) ->
+    %% TODO: distinguish between rows and docs
+    %% TODO: wire in csrt tracking
+    %% TODO: distinguish between all_docs vs view call
+    couch_stats:increment_counter([fabric_rpc, view, row_processed]),
+    %%couch_stats_resource_tracker:inc(rows_read),
     % Adding another row
     ViewRow = #view_row{
         id = couch_util:get_value(id, Row),
@@ -535,6 +540,7 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) ->
 changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) ->
     {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}};
 changes_enumerator(DocInfo, Acc) ->
+    couch_stats:increment_counter([fabric_rpc, changes, rows_read]),
     #fabric_changes_acc{
         db = Db,
         args = #changes_args{
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 4acb65c73..00d6b5e0e 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -137,7 +137,9 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
     Ref = rexi:cast(Node, self(), MFA, [sync]),
     try
         receive
-            {Ref, {ok, Db}} ->
+            {Ref, {ok, Db}, {delta, Delta}} ->
+                io:format("[~p]GET SHARD GOT DELTA: ~p~n", [self(), Delta]),
+                couch_stats_resource_tracker:accumulate_delta(Delta),
                 {ok, Db};
             {Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
                 throw(Error);
@@ -145,7 +147,10 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
                 throw(Error);
             {Ref, Reason} ->
                 couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]),
-                get_shard(Rest, Opts, Timeout, Factor)
+                get_shard(Rest, Opts, Timeout, Factor);
+            Other ->
+                io:format("GOT UNEXPECTED MESSAGE FORMAT: ~p~n", [Other]),
+                erlang:error(Other)
         after Timeout ->
             couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]),
             get_shard(Rest, Opts, Factor * Timeout, Factor)
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index 41006ce77..a19d650b4 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -229,9 +229,11 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu
             Result =
                 case mango_idx:def(Idx) of
                     all_docs ->
+                        couch_stats:increment_counter([mango_cursor, view, all_docs]),
                         CB = fun ?MODULE:handle_all_docs_message/2,
                         fabric:all_docs(Db, DbOpts, CB, Cursor, Args);
                     _ ->
+                        couch_stats:increment_counter([mango_cursor, view, idx]),
                         CB = fun ?MODULE:handle_message/2,
                         % Normal view
                         DDoc = ddocid(Idx),
diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl
index 59be7a6eb..bde297a15 100644
--- a/src/mango/src/mango_selector.erl
+++ b/src/mango/src/mango_selector.erl
@@ -50,6 +50,7 @@ normalize(Selector) ->
 % This assumes that the Selector has been normalized.
 % Returns true or false.
 match(Selector, D) ->
+    %% TODO: wire in csrt tracking
     couch_stats:increment_counter([mango, evaluate_selector]),
     match_int(Selector, D).
 
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 77830996e..672871089 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -129,7 +129,8 @@ async_server_call(Server, Caller, Request) ->
 -spec reply(any()) -> any().
 reply(Reply) ->
     {Caller, Ref} = get(rexi_from),
-    erlang:send(Caller, {Ref, Reply}).
+    Delta = couch_stats_resource_tracker:make_delta(),
+    erlang:send(Caller, {Ref, Reply, {delta, Delta}}).
 
 %% @equiv sync_reply(Reply, 300000)
 sync_reply(Reply) ->
@@ -214,7 +215,8 @@ stream(Msg, Limit, Timeout) ->
         {ok, Count} ->
             put(rexi_unacked, Count + 1),
             {Caller, Ref} = get(rexi_from),
-            erlang:send(Caller, {Ref, self(), Msg}),
+            Delta = couch_stats_resource_tracker:make_delta(),
+            erlang:send(Caller, {Ref, self(), Msg, {delta, Delta}}),
             ok
     catch
         throw:timeout ->
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 52489a9c5..08a9139bd 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -136,9 +136,12 @@ init_p(From, MFA) ->
     string() | undefined
 ) -> any().
 init_p(From, {M, F, A}, Nonce) ->
+    MFA = {M, F, length(A)},
     put(rexi_from, From),
-    put('$initial_call', {M, F, length(A)}),
+    put('$initial_call', MFA),
     put(nonce, Nonce),
+    couch_stats:create_context(From, MFA, Nonce),
+    couch_stats:maybe_track_rexi_init_p(MFA),
     try
         apply(M, F, A)
     catch
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index d59c5ea0f..4f4ca5576 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -83,7 +83,27 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
             end;
         {rexi, '$rexi_ping'} ->
             {ok, Acc0};
+        {Ref, Msg, {delta, Delta}} ->
+            io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
+            couch_stats_resource_tracker:accumulate_delta(Delta),
+            case lists:keyfind(Ref, Keypos, RefList) of
+            false ->
+                % this was some non-matching message which we will ignore
+                {ok, Acc0};
+            Worker ->
+                Fun(Msg, Worker, Acc0)
+            end;
+        {Ref, From, Msg, {delta, Delta}} ->
+            io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
+            couch_stats_resource_tracker:accumulate_delta(Delta),
+            case lists:keyfind(Ref, Keypos, RefList) of
+            false ->
+                {ok, Acc0};
+            Worker ->
+                Fun(Msg, {Worker, From}, Acc0)
+            end;
         {Ref, Msg} ->
+            %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
             case lists:keyfind(Ref, Keypos, RefList) of
                 false ->
                     % this was some non-matching message which we will ignore
@@ -92,6 +112,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
                     Fun(Msg, Worker, Acc0)
             end;
         {Ref, From, Msg} ->
+            %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
             case lists:keyfind(Ref, Keypos, RefList) of
                 false ->
                     {ok, Acc0};