You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2023/10/17 21:47:51 UTC
[couchdb] 01/01: PoC couch_stats local resource usage tracking
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch couch-stats-resource-tracker
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit be8427986dc67ec70b5916db186f2fb0dcdcacbf
Author: Russell Branca <ch...@apache.org>
AuthorDate: Mon Jul 18 16:26:00 2022 -0700
PoC couch_stats local resource usage tracking
---
src/chttpd/src/chttpd_httpd_handlers.erl | 1 +
src/chttpd/src/chttpd_misc.erl | 15 +
src/couch/priv/stats_descriptions.cfg | 5 +
src/couch/src/couch_btree.erl | 3 +
src/couch/src/couch_db.erl | 2 +
src/couch/src/couch_query_servers.erl | 8 +
src/couch/src/couch_server.erl | 2 +
src/couch_stats/src/couch_stats.erl | 30 +
.../src/couch_stats_resource_tracker.erl | 607 +++++++++++++++++++++
src/couch_stats/src/couch_stats_sup.erl | 1 +
src/fabric/priv/stats_descriptions.cfg | 26 +
src/fabric/src/fabric_rpc.erl | 6 +
src/fabric/src/fabric_util.erl | 9 +-
src/mango/src/mango_cursor_view.erl | 2 +
src/mango/src/mango_selector.erl | 1 +
src/rexi/src/rexi.erl | 6 +-
src/rexi/src/rexi_server.erl | 5 +-
src/rexi/src/rexi_utils.erl | 21 +
18 files changed, 745 insertions(+), 5 deletions(-)
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index 932b52e5f..e1b260222 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -20,6 +20,7 @@ url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1;
url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
url_handler(<<"_dbs_info">>) -> fun chttpd_misc:handle_dbs_info_req/1;
url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_active_resources">>) -> fun chttpd_misc:handle_resource_status_req/1;
url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1;
url_handler(<<"_node">>) -> fun chttpd_node:handle_node_req/1;
url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index 9df3a881f..195c8d828 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -20,6 +20,7 @@
handle_replicate_req/1,
handle_reload_query_servers_req/1,
handle_task_status_req/1,
+ handle_resource_status_req/1,
handle_up_req/1,
handle_utils_dir_req/1,
handle_utils_dir_req/2,
@@ -230,6 +231,20 @@ handle_task_status_req(#httpd{method = 'GET'} = Req) ->
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
+handle_resource_status_req(#httpd{method = 'GET'} = Req) ->
+ ok = chttpd:verify_is_server_admin(Req),
+ {Resp, Bad} = rpc:multicall(erlang, apply, [
+ fun() ->
+ {node(), couch_stats_resource_tracker:active()}
+ end,
+ []
+ ]),
+ %% TODO: incorporate Bad responses
+ io:format("ACTIVE RESP: ~p~nBAD RESP: ~p~n", [Resp, Bad]),
+ send_json(Req, {Resp});
+handle_resource_status_req(Req) ->
+ send_method_not_allowed(Req, "GET,HEAD").
+
handle_replicate_req(#httpd{method = 'POST', user_ctx = Ctx, req_body = PostBody} = Req) ->
chttpd:validate_ctype(Req, "application/json"),
%% see HACK in chttpd.erl about replication
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index 2d40518e2..e3f6a585e 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -414,6 +414,11 @@
{type, counter},
{desc, <<"number of legacy checksums found in couch_file instances">>}
]}.
+%% CSRT (couch_stats_resource_tracker) stats
+{[couchdb, csrt, delta_missing_t0], [
+ {type, counter},
+ {desc, <<"number of csrt contexts without a proper startime">>}
+]}.
{[pread, exceed_eof], [
{type, counter},
{desc, <<"number of the attempts to read beyond end of db file">>}
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index b974a22ee..168171320 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -472,6 +472,8 @@ reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
get_node(#btree{fd = Fd}, NodePos) ->
{ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+ %% TODO: wire in csrt tracking
+ couch_stats:increment_counter([couchdb, btree, get_node]),
{NodeType, NodeList}.
write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
@@ -1163,6 +1165,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) -
false ->
{stop, {PrevKVs, Reds}, Acc};
true ->
+ couch_stats:increment_counter([couchdb, btree, changes_processed]),
AssembledKV = assemble(Bt, K, V),
case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
{ok, Acc2} ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 2ef89ced3..c7afaa4b3 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -297,6 +297,7 @@ open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).
open_doc(Db, Id, Options) ->
+ %% TODO: wire in csrt tracking
increment_stat(Db, [couchdb, database_reads]),
case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted = true} = Doc} ->
@@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when
->
case lists:member(sys_db, Options) of
true ->
+ %% TODO: we shouldn't leak resource usage just because it's a sys_db
ok;
false ->
couch_stats:increment_counter(Stat, Count)
diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl
index 151bdc805..3d759594f 100644
--- a/src/couch/src/couch_query_servers.erl
+++ b/src/couch/src/couch_query_servers.erl
@@ -542,6 +542,8 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
{ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)}
catch
throw:{os_process_error, {exit_status, 1}} ->
+ %% TODO: wire in csrt tracking
+ couch_stats:increment_counter([couchdb, query_server, js_filter_error]),
%% batch used too much memory, retry sequentially.
Fun = fun(JsonDoc) ->
filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc])
@@ -550,6 +552,12 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
end.
filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) ->
+ %% Count usage in _int version as this can be repeated for OS error
+ %% Pros & cons... might not have actually processed `length(JsonDocs)` docs
+ %% but it certainly undercounts if we count in `filter_docs/5` above
+ %% TODO: wire in csrt tracking
+ couch_stats:increment_counter([couchdb, query_server, js_filter]),
+ couch_stats:increment_counter([couchdb, query_server, js_filtered_docs], length(JsonDocs)),
[true, Passes] = ddoc_prompt(
Db,
DDoc,
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 7dbbe4af1..c6c244ad4 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -97,6 +97,8 @@ sup_start_link(N) ->
gen_server:start_link({local, couch_server(N)}, couch_server, [N], []).
open(DbName, Options) ->
+ %% TODO: wire in csrt tracking
+ couch_stats:increment_counter([couchdb, couch_server, open]),
try
validate_open_or_create(DbName, Options),
open_int(DbName, Options)
diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl
index 29a402449..29190e6b0 100644
--- a/src/couch_stats/src/couch_stats.erl
+++ b/src/couch_stats/src/couch_stats.erl
@@ -24,6 +24,12 @@
update_gauge/2
]).
+%% couch_stats_resource_tracker API
+-export([
+ create_context/3,
+ maybe_track_rexi_init_p/1
+]).
+
-type response() :: ok | {error, unknown_metric} | {error, invalid_metric}.
-type stat() :: {any(), [{atom(), any()}]}.
@@ -49,6 +55,11 @@ increment_counter(Name) ->
-spec increment_counter(any(), pos_integer()) -> response().
increment_counter(Name, Value) ->
+ %% Should maybe_track_local happen before or after notify?
+ %% If after, only currently tracked metrics declared in the app's
+ %% stats_description.cfg will be trackable locally. Pros/cons.
+ %io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]),
+ ok = maybe_track_local_counter(Name, Value),
case couch_stats_util:get_counter(Name, stats()) of
{ok, Ctx} -> couch_stats_counter:increment(Ctx, Value);
{error, Error} -> {error, Error}
@@ -100,6 +111,25 @@ stats() ->
now_sec() ->
erlang:monotonic_time(second).
+%% Only potentially track positive increments to counters
+-spec maybe_track_local_counter(any(), any()) -> ok.
+maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 ->
+ %%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]),
+ couch_stats_resource_tracker:maybe_inc(Name, Val),
+ ok;
+maybe_track_local_counter(_, _) ->
+ ok.
+
+create_context(From, MFA, Nonce) ->
+ couch_stats_resource_tracker:create_context(From, MFA, Nonce).
+
+maybe_track_rexi_init_p({M, F, _A}) ->
+ Metric = [M, F, spawned],
+ case couch_stats_resource_tracker:should_track(Metric) of
+ true -> increment_counter(Metric);
+ false -> ok
+ end.
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_stats/src/couch_stats_resource_tracker.erl b/src/couch_stats/src/couch_stats_resource_tracker.erl
new file mode 100644
index 000000000..00e375e8c
--- /dev/null
+++ b/src/couch_stats/src/couch_stats_resource_tracker.erl
@@ -0,0 +1,607 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stats_resource_tracker).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2
+]).
+
+-export([
+ inc/1, inc/2,
+ maybe_inc/2,
+ get_pid_ref/0,
+ accumulate_delta/1
+]).
+
+-export([
+ create_context/0, create_context/1, create_context/3,
+ track/1,
+ should_track/1
+]).
+
+-export([
+ active/0
+]).
+
+-export([
+ make_delta/0
+]).
+
+%% Singular increment operations
+-export([
+ db_opened/0,
+ doc_read/0,
+ row_read/0,
+ change_processed/0,
+ ioq_called/0,
+ js_evaled/0,
+ js_filtered/0,
+ js_filtered_error/0,
+ js_filtered_doc/0,
+ mango_match_evaled/0,
+ get_kv_node/0,
+ get_kp_node/0
+]).
+
+%% Plural increment operations
+-export([
+ js_filtered_docs/1,
+ io_bytes_read/1,
+ io_bytes_written/1
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+%% Use these for record upgrades over the wire and in ETS tables
+%% TODO: alternatively, just delete these. Currently using a map
+%% for shipping deltas over the wire, avoiding much of the
+%% problem here. We'll likely still need to handle upgrades to
+%% map format over time, so let's decide a course of action here.
+-define(RCTX_V1, rctx_v1).
+-define(RCTX, ?RCTX_V1).
+
+-define(MANGO_EVAL_MATCH, mango_eval_match).
+-define(DB_OPEN_DOC, docs_read).
+
+-define(FRPC_CHANGES_ROW, changes_processed).
+
+%% Module pdict markers
+-define(DELTA_TA, csrt_delta_ta).
+-define(DELTA_TZ, csrt_delta_tz). %% T Zed instead of T0
+-define(PID_REF, csrt_pid_ref). %% track local ID
+
+
+-record(st, {
+ eviction_delay = 10000, %% How many ms dead processes are visible
+ tracking = #{} %% track active processes for eventual eviction
+}).
+
+
+%% TODO: switch to:
+%% -record(?RCTX, {
+-record(rctx, {
+ updated_at = os:timestamp(),
+ exited_at,
+ pid_ref,
+ mfa,
+ nonce,
+ from,
+ type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
+ db_open = 0,
+ docs_read = 0,
+ rows_read = 0,
+ changes_processed = 0,
+ ioq_calls = 0,
+ io_bytes_read = 0,
+ io_bytes_written = 0,
+ js_evals = 0,
+ js_filter = 0,
+ js_filter_error = 0,
+ js_filtered_docs = 0,
+ mango_eval_match = 0,
+ get_kv_node = 0,
+ get_kp_node = 0,
+ state = alive
+}).
+
+db_opened() -> inc(db_open).
+doc_read() -> inc(docs_read).
+row_read() -> inc(rows_read).
+change_processed() -> inc(changes_processed).
+ioq_called() -> inc(ioq_calls).
+js_evaled() -> inc(js_evals).
+js_filtered() -> inc(js_filter).
+js_filtered_error() -> inc(js_filter_error).
+js_filtered_doc() -> inc(js_filtered_docs).
+mango_match_evaled() -> inc(mango_eval_match).
+get_kv_node() -> inc(get_kv_node).
+get_kp_node() -> inc(get_kp_node).
+
+js_filtered_docs(N) -> inc(js_filtered_docs, N).
+io_bytes_read(N) -> inc(io_bytes_read, N).
+io_bytes_written(N) -> inc(io_bytes_written, N).
+
+inc(db_open) ->
+ inc(db_open, 1);
+inc(docs_read) ->
+ inc(docs_read, 1);
+inc(rows_read) ->
+ inc(rows_read, 1);
+inc(changes_processed) ->
+ inc(changes_processed, 1);
+inc(ioq_calls) ->
+ inc(ioq_calls, 1);
+inc(io_bytes_read) ->
+ inc(io_bytes_read, 1);
+inc(io_bytes_written) ->
+ inc(io_bytes_written, 1);
+inc(js_evals) ->
+ inc(js_evals, 1);
+inc(js_filter) ->
+ inc(js_filter, 1);
+inc(js_filter_error) ->
+ inc(js_filter_error, 1);
+inc(js_filtered_docs) ->
+ inc(js_filtered_docs, 1);
+inc(?MANGO_EVAL_MATCH) ->
+ inc(?MANGO_EVAL_MATCH, 1);
+inc(get_kv_node) ->
+ inc(get_kv_node, 1);
+inc(get_kp_node) ->
+ inc(get_kp_node, 1);
+inc(_) ->
+ 0.
+
+
+inc(db_open, N) ->
+ update_counter(#rctx.db_open, N);
+inc(rows_read, N) ->
+ update_counter(#rctx.rows_read, N);
+inc(ioq_calls, N) ->
+ update_counter(#rctx.ioq_calls, N);
+inc(io_bytes_read, N) ->
+ update_counter(#rctx.io_bytes_read, N);
+inc(io_bytes_written, N) ->
+ update_counter(#rctx.io_bytes_written, N);
+inc(js_evals, N) ->
+ update_counter(#rctx.js_evals, N);
+inc(js_filter, N) ->
+ update_counter(#rctx.js_filter, N);
+inc(js_filter_error, N) ->
+ update_counter(#rctx.js_filter_error, N);
+inc(js_filtered_docs, N) ->
+ update_counter(#rctx.js_filtered_docs, N);
+inc(?MANGO_EVAL_MATCH, N) ->
+ update_counter(#rctx.?MANGO_EVAL_MATCH, N);
+inc(?DB_OPEN_DOC, N) ->
+ update_counter(#rctx.?DB_OPEN_DOC, N);
+inc(?FRPC_CHANGES_ROW, N) ->
+ update_counter(#rctx.?FRPC_CHANGES_ROW, N);
+inc(get_kv_node, N) ->
+ update_counter(#rctx.get_kv_node, N);
+inc(get_kp_node, N) ->
+ update_counter(#rctx.get_kp_node, N);
+inc(_, _) ->
+ 0.
+
+maybe_inc([mango, evaluate_selector], Val) ->
+ inc(?MANGO_EVAL_MATCH, Val);
+maybe_inc([couchdb, database_reads], Val) ->
+ inc(?DB_OPEN_DOC, Val);
+maybe_inc([fabric_rpc, changes, rows_read], Val) ->
+ inc(?FRPC_CHANGES_ROW, Val);
+maybe_inc(_, _) ->
+ 0.
+
+
+%% TODO: update stats_descriptions.cfg for relevant apps
+should_track([fabric_rpc, all_docs, spawned]) ->
+ true;
+should_track([fabric_rpc, changes, spawned]) ->
+ true;
+should_track([fabric_rpc, map_view, spawned]) ->
+ true;
+should_track([fabric_rpc, reduce_view, spawned]) ->
+ true;
+should_track([fabric_rpc, get_all_security, spawned]) ->
+ true;
+should_track([fabric_rpc, open_doc, spawned]) ->
+ true;
+should_track([fabric_rpc, update_docs, spawned]) ->
+ true;
+should_track([fabric_rpc, open_shard, spawned]) ->
+ true;
+should_track([mango_cursor, view, all_docs]) ->
+ true;
+should_track([mango_cursor, view, idx]) ->
+ true;
+should_track(_) ->
+ false.
+
+%% TODO: update coordinator stats from worker deltas
+accumulate_delta(_Delta) ->
+ ok.
+
+update_counter(Field, Count) ->
+ update_counter(get_pid_ref(), Field, Count).
+
+
+update_counter({_Pid,_Ref}=Key, Field, Count) ->
+ ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}).
+
+
+active() ->
+ lists:map(fun to_json/1, ets:tab2list(?MODULE)).
+
+
+to_json(#rctx{}=Rctx) ->
+ #rctx{
+ updated_at = TP,
+ pid_ref = {_Pid, _Ref} = PidRef,
+ mfa = MFA0,
+ nonce = Nonce0,
+ from = From0,
+ docs_read = DocsRead,
+ rows_read = RowsRead,
+ state = State0,
+ type = Type,
+ changes_processed = ChangesProcessed
+ } = Rctx,
+ %%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
+ MFA = case MFA0 of
+ {M, F, A} ->
+ [M, F, A];
+ undefined ->
+ null;
+ Other ->
+ throw({error, {unexpected, Other}})
+ end,
+ From = case From0 of
+ {Parent, ParentRef} ->
+ [pid_to_list(Parent), ref_to_list(ParentRef)];
+ undefined ->
+ null
+ end,
+ State = case State0 of
+ alive ->
+ alive;
+ {down, Reason} when is_atom(Reason) ->
+ [down, Reason];
+ Unknown ->
+ [unknown, io_lib:format("~w", [Unknown])]
+ end,
+ Nonce = case Nonce0 of
+ undefined ->
+ null;
+ Nonce0 ->
+ list_to_binary(Nonce0)
+ end,
+ #{
+ updated_at => term_to_json(TP),
+ %%pid_ref => [pid_to_list(Pid), ref_to_list(Ref)],
+ pid_ref => term_to_json(PidRef),
+ mfa => term_to_json(MFA),
+ nonce => term_to_json(Nonce),
+ %%from => From,
+ from => term_to_json(From),
+ docs_read => DocsRead,
+ rows_read => RowsRead,
+ state => State,
+ type => term_to_json(Type),
+ changes_processed => ChangesProcessed
+ }.
+
+term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) ->
+ [?l2b(pid_to_list(Pid)), ?l2b(ref_to_list(Ref))];
+term_to_json({A, B, C}) ->
+ [A, B, C];
+term_to_json(undefined) ->
+ null;
+term_to_json(T) ->
+ T.
+
+term_to_flat_json(Tuple) when is_tuple(Tuple) ->
+ ?l2b(io_lib:format("~w", [Tuple]));
+term_to_flat_json(undefined) ->
+ null;
+term_to_flat_json(T) ->
+ T.
+
+to_flat_json(#rctx{}=Rctx) ->
+ #rctx{
+ updated_at = TP,
+ pid_ref = {_Pid, _Ref} = PidRef,
+ mfa = MFA0,
+ nonce = Nonce0,
+ from = From0,
+ docs_read = DocsRead,
+ rows_read = RowsRead,
+ state = State0,
+ type = Type,
+ changes_processed = ChangesProcessed
+ } = Rctx,
+ io:format("TO_JSON_MFA: ~p~n", [MFA0]),
+ MFA = case MFA0 of
+ {_M, _F, _A} ->
+ ?l2b(io_lib:format("~w", [MFA0]));
+ undefined ->
+ null;
+ Other ->
+ throw({error, {unexpected, Other}})
+ end,
+ From = case From0 of
+ {_Parent, _ParentRef} ->
+ ?l2b(io_lib:format("~w", [From0]));
+ undefined ->
+ null
+ end,
+ State = case State0 of
+ alive ->
+ alive;
+ State0 ->
+ ?l2b(io_lib:format("~w", [State0]))
+ end,
+ Nonce = case Nonce0 of
+ undefined ->
+ null;
+ Nonce0 ->
+ list_to_binary(Nonce0)
+ end,
+ #{
+ %%updated_at => ?l2b(io_lib:format("~w", [TP])),
+ updated_at => term_to_flat_json(TP),
+ %%pid_ref => [pid_to_list(Pid), ref_to_list(Ref)],
+ pid_ref => ?l2b(io_lib:format("~w", [PidRef])),
+ mfa => MFA,
+ nonce => Nonce,
+ from => From,
+ docs_read => DocsRead,
+ rows_read => RowsRead,
+ state => State,
+ type => term_to_flat_json(Type),
+ changes_processed => ChangesProcessed
+ }.
+
+get_pid_ref() ->
+ case get(?PID_REF) of
+ undefined ->
+ Ref = make_ref(),
+ set_pid_ref({self(), Ref});
+ PidRef ->
+ PidRef
+ end.
+
+
+create_context() ->
+ create_context(self()).
+
+
+create_context(Pid) ->
+ Ref = make_ref(),
+ Rctx = make_record(Pid, Ref),
+ track(Rctx),
+ ets:insert(?MODULE, Rctx),
+ Rctx.
+
+%% add type to disnguish coordinator vs rpc_worker
+create_context(From, {M,F,_A} = MFA, Nonce) ->
+ io:format("CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [From, MFA, Nonce]),
+ Ref = make_ref(),
+ %%Rctx = make_record(self(), Ref),
+ %% TODO: extract user_ctx and db/shard from
+ Rctx = #rctx{
+ pid_ref = {self(), Ref},
+ from = From,
+ mfa = MFA,
+ type = {worker, M, F},
+ nonce = Nonce
+ },
+ track(Rctx),
+ erlang:put(?DELTA_TZ, Rctx),
+ ets:insert(?MODULE, Rctx),
+ Rctx.
+
+track(#rctx{}=Rctx) ->
+ %% TODO: should this block or not? If no, what cleans up zombies?
+ %% gen_server:call(?MODULE, {track, PR}).
+ gen_server:cast(?MODULE, {track, Rctx}).
+
+
+make_delta() ->
+ TA = case get(?DELTA_TA) of
+ undefined ->
+ %% Need to handle this better, can't just make a new T0 at T' as
+ %% the timestamps will be identical causing a divide by zero error.
+ %%
+ %% Realistically need to ensure that all invocations of database
+ %% operations sets T0 appropriately. Perhaps it's possible to do
+ %% this is the couch_db:open chain, and then similarly, in
+ %% couch_server, and uhhhh... couch_file, and...
+ %%
+ %% I think we need some type of approach for establishing a T0 that
+ %% doesn't result in outrageous deltas. For now zero out the
+ %% microseconds field, or subtract a second on the off chance that
+ %% microseconds is zero. I'm not uptodate on the latest Erlang time
+ %% libraries and don't remember how to easily get an
+ %% `os:timestamp()` out of now() - 100ms or some such.
+ %%
+ %% I think it's unavoidable that we'll have some codepaths that do
+ %% not properly instantiate the T0 at spawn resulting in needing to
+ %% do some time of "time warp" or ignoring the timing collection
+ %% entirely. Perhaps if we hoisted out the stats collection into
+ %% the primary flow of the database and funnel that through all the
+ %% function clauses we could then utilize Dialyzer to statically
+ %% analyze and assert all code paths that invoke database
+ %% operations have properly instantinated a T0 at the appropriate
+ %% start time such that we don't have to "fudge" deltas with a
+ %% missing start point, but we're a long ways from that happening
+ %% so I feel it necessary to address the NULL start time.
+
+ %% Track how often we fail to initiate T0 correctly
+ %% Perhaps somewhat naughty we're incrementing stats from within
+ %% couch_stats itself? Might need to handle this differently
+ %% TODO: determine appropriate course of action here
+ couch_stats:increment_counter(
+ [couchdb, csrt, delta_missing_t0]),
+ %%[couch_stats_resource_tracker, delta_missing_t0]),
+
+ case erlang:get(?DELTA_TZ) of
+ undefined ->
+ TA0 = make_delta_base(),
+ %% TODO: handline missing deltas, otherwise divide by zero
+ set_delta_a(TA0),
+ TA0;
+ TA0 ->
+ TA0
+ end;
+ %%?RCTX{} = TA0 ->
+ #rctx{} = TA0 ->
+ TA0
+ end,
+ TB = get_resource(),
+ make_delta(TA, TB).
+
+
+make_delta(#rctx{}=TA, #rctx{}=TB) ->
+ Delta = #{
+ docs_read => TB#rctx.docs_read - TA#rctx.docs_read,
+ rows_read => TB#rctx.rows_read - TA#rctx.rows_read,
+ changes_processed => TB#rctx.changes_processed - TA#rctx.changes_processed,
+ dt => timer:now_diff(TB#rctx.updated_at, TA#rctx.updated_at)
+ },
+ %% TODO: reevaluate this decision
+ %% Only return non zero (and also positive) delta fields
+ maps:filter(fun(_K,V) -> V > 0 end, Delta);
+make_delta(_, #rctx{}) ->
+ #{error => missing_beg_rctx};
+make_delta(#rctx{}, _) ->
+ #{error => missing_fin_rctx}.
+
+make_delta_base() ->
+ Ref = make_ref(),
+ %%Rctx = make_record(self(), Ref),
+ %% TODO: extract user_ctx and db/shard from request
+ TA0 = #rctx{
+ pid_ref = {self(), Ref}
+ },
+ case TA0#rctx.updated_at of
+ {Me, S, Mi} when Mi > 0 ->
+ TA0#rctx{updated_at = {Me, S, 0}};
+ {Me, S, Mi} when S > 0 ->
+ TA0#rctx{updated_at = {Me, S - 1, Mi}}
+ end.
+
+set_delta_a(TA) ->
+ erlang:put(?DELTA_TA, TA).
+
+set_pid_ref(PidRef) ->
+ erlang:put(?PID_REF, PidRef),
+ PidRef.
+
+get_resource() ->
+ get_resource(get_pid_ref()).
+
+get_resource(PidRef) ->
+ case ets:lookup(?MODULE, PidRef) of
+ [#rctx{}=TP] ->
+ TP;
+ [] ->
+ undefined
+ end.
+
+make_record(Pid, Ref) ->
+ #rctx{pid_ref = {Pid, Ref}}.
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ets:new(?MODULE, [
+ named_table,
+ public,
+ {decentralized_counters, true}, %% TODO: test impact of this
+ {write_concurrency, true},
+ {read_concurrency, true},
+ {keypos, #rctx.pid_ref}
+ ]),
+ {ok, #st{}}.
+
+handle_call(fetch, _from, #st{} = St) ->
+ {reply, {ok, St}, St};
+handle_call({track, _}, _From, St) ->
+ {reply, ok, St};
+handle_call(Msg, _From, St) ->
+ {stop, {unknown_call, Msg}, error, St}.
+
+handle_cast({track, #rctx{pid_ref={Pid,_}=PidRef}}, #st{tracking=AT0} = St0) ->
+ St = case maps:is_key(PidRef, AT0) of
+ true -> %% noop, we're already tracking this PidRef
+ St0;
+ false -> %% setup new monitor and double bookkeep refs
+ Mon = erlang:monitor(process, Pid),
+ AT = maps:put(Mon, PidRef, maps:put(PidRef, Mon, AT0)),
+ St0#st{tracking=AT}
+ end,
+ {noreply, St};
+handle_cast(Msg, St) ->
+ {stop, {unknown_cast, Msg}, St}.
+
+handle_info({'DOWN', MonRef, Type, DPid, Reason}, #st{tracking=AT0} = St0) ->
+ io:format("CSRT:HI(~p)~n", [{'DOWN', MonRef, Type, DPid, Reason}]),
+ St = case maps:get(MonRef, AT0, undefined) of
+ undefined ->
+ io:format("ERROR: UNEXPECTED MISSING MONITOR IN TRACKING TABLE: {~p, ~p}~n", [MonRef, DPid]),
+ St0;
+ {RPid, _Ref} = PidRef ->
+ if
+ RPid =:= DPid -> ok;
+ true -> erlang:halt(io_lib:format("CSRT:HI PID MISMATCH ABORT: ~p =/= ~p~n", [DPid, RPid]))
+ end,
+ %% remove double bookkeeping
+ AT = maps:remove(MonRef, maps:remove(PidRef, AT0)),
+ %% TODO: Assert Pid matches Object
+ %% update process state in live table
+ %% TODO: decide whether we want the true match to crash this process on failure
+ true = ets:update_element(?MODULE, PidRef,
+ [{#rctx.state, {down, Reason}}, {#rctx.updated_at, os:timestamp()}]),
+ log_process_lifetime_report(PidRef),
+ %% Delay eviction to allow human visibility on short lived pids
+ erlang:send_after(St0#st.eviction_delay, self(), {evict, PidRef}),
+ St0#st{tracking=AT}
+ end,
+ {noreply, St};
+handle_info({evict, {_Pid, _Ref}=PidRef}, #st{}=St) ->
+ ets:delete(?MODULE, PidRef),
+ {noreply, St};
+handle_info(Msg, St) ->
+ {stop, {unknown_info, Msg}, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+log_process_lifetime_report(PidRef) ->
+ %% More safely assert this can't ever be undefined
+ #rctx{} = Rctx = get_resource(PidRef),
+ %% TODO: catch error out of here, report crashes on depth>1 json
+ couch_log:report("csrt-pid-usage-lifetime", to_flat_json(Rctx)).
diff --git a/src/couch_stats/src/couch_stats_sup.erl b/src/couch_stats/src/couch_stats_sup.erl
index 325372c3e..4b4df17e2 100644
--- a/src/couch_stats/src/couch_stats_sup.erl
+++ b/src/couch_stats/src/couch_stats_sup.erl
@@ -29,6 +29,7 @@ init([]) ->
{
{one_for_one, 5, 10}, [
?CHILD(couch_stats_server, worker),
+ ?CHILD(couch_stats_resource_tracker, worker),
?CHILD(couch_stats_process_tracker, worker)
]
}}.
diff --git a/src/fabric/priv/stats_descriptions.cfg b/src/fabric/priv/stats_descriptions.cfg
index d12aa0c84..57ec0b8d2 100644
--- a/src/fabric/priv/stats_descriptions.cfg
+++ b/src/fabric/priv/stats_descriptions.cfg
@@ -26,3 +26,29 @@
{type, counter},
{desc, <<"number of write quorum errors">>}
]}.
+
+
+%% fabric_rpc worker stats
+%% TODO: decide on which naming scheme:
+%% {[fabric_rpc, get_all_security, spawned], [
+%% {[fabric_rpc, spawned, get_all_security], [
+{[fabric_rpc, get_all_security, spawned], [
+ {type, counter},
+ {desc, <<"number of fabric_rpc worker get_all_security spawns">>}
+]}.
+{[fabric_rpc, open_doc, spawned], [
+ {type, counter},
+ {desc, <<"number of fabric_rpc worker open_doc spawns">>}
+]}.
+{[fabric_rpc, all_docs, spawned], [
+ {type, counter},
+ {desc, <<"number of fabric_rpc worker all_docs spawns">>}
+]}.
+{[fabric_rpc, open_shard, spawned], [
+ {type, counter},
+ {desc, <<"number of fabric_rpc worker open_shard spawns">>}
+]}.
+{[fabric_rpc, changes, spawned], [
+ {type, counter},
+ {desc, <<"number of fabric_rpc worker changes spawns">>}
+]}.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index d01f1f5a7..b5064b5af 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -492,6 +492,11 @@ view_cb({meta, Meta}, Acc) ->
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, Acc) ->
+ %% TODO: distinguish between rows and docs
+ %% TODO: wire in csrt tracking
+ %% TODO: distinguish between all_docs vs view call
+ couch_stats:increment_counter([fabric_rpc, view, row_processed]),
+ %%couch_stats_resource_tracker:inc(rows_read),
% Adding another row
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
@@ -535,6 +540,7 @@ changes_enumerator(#full_doc_info{} = FDI, Acc) ->
changes_enumerator(#doc_info{id = <<"_local/", _/binary>>, high_seq = Seq}, Acc) ->
{ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending - 1}};
changes_enumerator(DocInfo, Acc) ->
+ couch_stats:increment_counter([fabric_rpc, changes, rows_read]),
#fabric_changes_acc{
db = Db,
args = #changes_args{
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 4acb65c73..00d6b5e0e 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -137,7 +137,9 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive
- {Ref, {ok, Db}} ->
+ {Ref, {ok, Db}, {delta, Delta}} ->
+ io:format("[~p]GET SHARD GOT DELTA: ~p~n", [self(), Delta]),
+ couch_stats_resource_tracker:accumulate_delta(Delta),
{ok, Db};
{Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
throw(Error);
@@ -145,7 +147,10 @@ get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
throw(Error);
{Ref, Reason} ->
couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]),
- get_shard(Rest, Opts, Timeout, Factor)
+ get_shard(Rest, Opts, Timeout, Factor);
+ Other ->
+ io:format("GOT UNEXPECTED MESSAGE FORMAT: ~p~n", [Other]),
+ erlang:error(Other)
after Timeout ->
couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]),
get_shard(Rest, Opts, Factor * Timeout, Factor)
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index 41006ce77..a19d650b4 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -229,9 +229,11 @@ execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFu
Result =
case mango_idx:def(Idx) of
all_docs ->
+ couch_stats:increment_counter([mango_cursor, view, all_docs]),
CB = fun ?MODULE:handle_all_docs_message/2,
fabric:all_docs(Db, DbOpts, CB, Cursor, Args);
_ ->
+ couch_stats:increment_counter([mango_cursor, view, idx]),
CB = fun ?MODULE:handle_message/2,
% Normal view
DDoc = ddocid(Idx),
diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl
index 59be7a6eb..bde297a15 100644
--- a/src/mango/src/mango_selector.erl
+++ b/src/mango/src/mango_selector.erl
@@ -50,6 +50,7 @@ normalize(Selector) ->
% This assumes that the Selector has been normalized.
% Returns true or false.
match(Selector, D) ->
+ %% TODO: wire in csrt tracking
couch_stats:increment_counter([mango, evaluate_selector]),
match_int(Selector, D).
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 77830996e..672871089 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -129,7 +129,8 @@ async_server_call(Server, Caller, Request) ->
-spec reply(any()) -> any().
reply(Reply) ->
{Caller, Ref} = get(rexi_from),
- erlang:send(Caller, {Ref, Reply}).
+ Delta = couch_stats_resource_tracker:make_delta(),
+ erlang:send(Caller, {Ref, Reply, {delta, Delta}}).
%% @equiv sync_reply(Reply, 300000)
sync_reply(Reply) ->
@@ -214,7 +215,8 @@ stream(Msg, Limit, Timeout) ->
{ok, Count} ->
put(rexi_unacked, Count + 1),
{Caller, Ref} = get(rexi_from),
- erlang:send(Caller, {Ref, self(), Msg}),
+ Delta = couch_stats_resource_tracker:make_delta(),
+ erlang:send(Caller, {Ref, self(), Msg, {delta, Delta}}),
ok
catch
throw:timeout ->
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 52489a9c5..08a9139bd 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -136,9 +136,12 @@ init_p(From, MFA) ->
string() | undefined
) -> any().
init_p(From, {M, F, A}, Nonce) ->
+ MFA = {M, F, length(A)},
put(rexi_from, From),
- put('$initial_call', {M, F, length(A)}),
+ put('$initial_call', MFA),
put(nonce, Nonce),
+ couch_stats:create_context(From, MFA, Nonce),
+ couch_stats:maybe_track_rexi_init_p(MFA),
try
apply(M, F, A)
catch
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index d59c5ea0f..4f4ca5576 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -83,7 +83,27 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
end;
{rexi, '$rexi_ping'} ->
{ok, Acc0};
+ {Ref, Msg, {delta, Delta}} ->
+ io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
+ couch_stats_resource_tracker:accumulate_delta(Delta),
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {Ref, From, Msg, {delta, Delta}} ->
+ io:format("[~p]GOT DELTA: ~p -- ~p~n", [self(), Delta, Msg]),
+ couch_stats_resource_tracker:accumulate_delta(Delta),
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
{Ref, Msg} ->
+ %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
% this was some non-matching message which we will ignore
@@ -92,6 +112,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
Fun(Msg, Worker, Acc0)
end;
{Ref, From, Msg} ->
+ %%io:format("GOT NON DELTA MSG: ~p~n", [Msg]),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
{ok, Acc0};