You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2023/05/10 20:05:08 UTC

[couchdb] branch main updated: mango: extend execution statistics with keys examined (#4569)

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8600e46f2 mango: extend execution statistics with keys examined (#4569)
8600e46f2 is described below

commit 8600e46f2c8ca7ced5190a710fca4acd54426d2b
Author: PÁLI Gábor János <ga...@ibm.com>
AuthorDate: Wed May 10 22:05:00 2023 +0200

    mango: extend execution statistics with keys examined (#4569)
    
    Add another field to the shard-level Mango execution statistics
    to keep track of the count of keys that were examined for the
    query.  Note that this requires to change the way how stats are
    stored -- an approach similar to that of the view callback
    arguments was chosen, which features a map.
    
    This current version supports both the old and new formats.  The
    coordinator may request getting the results in the new one by
    adding `execution_stats_map` for the arguments of the view
    callback.  Otherwise the old format is used (without the extra
    field), which makes it possible to work with older coordinators.
    Old workers will automatically ignore this argument and answer in
    the old format.
---
 src/docs/src/api/database/find.rst        |   3 +-
 src/mango/src/mango.hrl                   |  10 +-
 src/mango/src/mango_cursor_view.erl       | 175 ++++++++++++++++++++++++++----
 src/mango/src/mango_execution_stats.erl   |  42 ++++++-
 src/mango/test/15-execution-stats-test.py |  17 ++-
 5 files changed, 219 insertions(+), 28 deletions(-)

diff --git a/src/docs/src/api/database/find.rst b/src/docs/src/api/database/find.rst
index 027ddf8ee..ede5598c9 100644
--- a/src/docs/src/api/database/find.rst
+++ b/src/docs/src/api/database/find.rst
@@ -145,7 +145,7 @@ Example response when finding documents using an index:
                 }
             ],
             "execution_stats": {
-                "total_keys_examined": 0,
+                "total_keys_examined": 200,
                 "total_docs_examined": 200,
                 "total_quorum_docs_examined": 0,
                 "results_returned": 2,
@@ -925,7 +925,6 @@ The execution statistics currently include:
 | Field                          | Description                                |
 +================================+============================================+
 | ``total_keys_examined``        | Number of index keys examined.             |
-|                                | Currently always 0.                        |
 +--------------------------------+--------------------------------------------+
 | ``total_docs_examined``        | Number of documents fetched from the       |
 |                                | database / index, equivalent to using      |
diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl
index 2ff07aa4b..d8fa095bf 100644
--- a/src/mango/src/mango.hrl
+++ b/src/mango/src/mango.hrl
@@ -30,6 +30,14 @@
 -type selector() :: any().
 -type ejson() :: {[{atom(), any()}]}.
 
--type shard_stats() :: {docs_examined, non_neg_integer()}.
+-type shard_stats() :: shard_stats_v1() | shard_stats_v2().
+
+-type shard_stats_v1() :: {docs_examined, non_neg_integer()}.
+-type shard_stats_v2() ::
+    #{
+         docs_examined => non_neg_integer(),
+         keys_examined => non_neg_integer()
+    }.
+
 -type row_property_key() :: id | key | value | doc.
 -type row_properties() :: [{row_property_key(), any()}].
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index d5cffbc5c..e044c56fc 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -79,6 +79,15 @@ viewcbargs_get(fields, Args) when is_map(Args) ->
 viewcbargs_get(covering_index, Args) when is_map(Args) ->
     maps:get(covering_index, Args, undefined).
 
+-spec shard_stats_get(Key, Args) -> Stat when
+    Key :: docs_examined | keys_examined,
+    Args :: shard_stats_v2(),
+    Stat :: non_neg_integer().
+shard_stats_get(docs_examined, Args) when is_map(Args) ->
+    maps:get(docs_examined, Args, 0);
+shard_stats_get(keys_examined, Args) when is_map(Args) ->
+    maps:get(keys_examined, Args, 0).
+
 -spec create(Db, Indexes, Selector, Options) -> {ok, #cursor{}} when
     Db :: database(),
     Indexes :: [#idx{}],
@@ -187,7 +196,12 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) -
             {selector, Selector},
             {callback_args, viewcbargs_new(Selector, Fields, undefined)},
 
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+
+            % Request execution statistics in a map.  The purpose of this option is
+            % to maintain interoperability on version upgrades.
+            % TODO remove this option in a later version.
+            {execution_stats_map, true}
         ]
     }.
 
@@ -326,11 +340,13 @@ choose_best_index(IndexRanges) ->
     (ok, ddoc_updated) -> any().
 view_cb({meta, Meta}, Acc) ->
     % Map function starting
-    put(mango_docs_examined, 0),
+    mango_execution_stats:shard_init(),
     set_mango_msg_timestamp(),
     ok = rexi:stream2({meta, Meta}),
     {ok, Acc};
 view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
+    mango_execution_stats:shard_incr_keys_examined(),
+    couch_stats:increment_counter([mango, keys_examined]),
     ViewRow = #view_row{
         id = couch_util:get_value(id, Row),
         key = couch_util:get_value(key, Row),
@@ -379,14 +395,23 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
             ok = rexi:stream2(ViewRow),
             set_mango_msg_timestamp();
         {Doc, _} ->
-            put(mango_docs_examined, get(mango_docs_examined) + 1),
+            mango_execution_stats:shard_incr_docs_examined(),
             couch_stats:increment_counter([mango, docs_examined]),
             Process(Doc)
     end,
     {ok, Acc};
-view_cb(complete, Acc) ->
+view_cb(complete, #mrargs{extra = Options} = Acc) ->
+    ShardStats = mango_execution_stats:shard_get_stats(),
+    Stats =
+        case couch_util:get_value(execution_stats_map, Options, false) of
+            true ->
+                ShardStats;
+            false ->
+                DocsExamined = maps:get(docs_examined, ShardStats),
+                {docs_examined, DocsExamined}
+        end,
     % Send shard-level execution stats
-    ok = rexi:stream2({execution_stats, {docs_examined, get(mango_docs_examined)}}),
+    ok = rexi:stream2({execution_stats, Stats}),
     % Finish view output
     ok = rexi:stream_last(complete),
     {ok, Acc};
@@ -459,12 +484,20 @@ handle_message({row, Props}, Cursor) ->
             couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
             {ok, Cursor}
     end;
-handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) ->
-    {docs_examined, DocsExamined} = ShardStats,
-    Cursor1 = Cursor#cursor{
+handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) ->
+    #cursor{execution_stats = Stats} = Cursor0,
+    Cursor = Cursor0#cursor{
         execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
     },
-    {ok, Cursor1};
+    {ok, Cursor};
+handle_message({execution_stats, #{} = ShardStats}, Cursor0) ->
+    DocsExamined = shard_stats_get(docs_examined, ShardStats),
+    KeysExamined = shard_stats_get(keys_examined, ShardStats),
+    #cursor{execution_stats = Stats0} = Cursor0,
+    Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
+    Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
+    Cursor = Cursor0#cursor{execution_stats = Stats},
+    {ok, Cursor};
 handle_message(complete, Cursor) ->
     {ok, Cursor};
 handle_message({error, Reason}, _Cursor) ->
@@ -702,7 +735,8 @@ base_opts_test() ->
                 fields => Fields,
                 covering_index => undefined
             }},
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
         ],
     MRArgs =
         #mrargs{
@@ -945,6 +979,7 @@ execute_test_() ->
         [
             ?TDEF_FE(t_execute_empty),
             ?TDEF_FE(t_execute_ok_all_docs),
+            ?TDEF_FE(t_execute_ok_all_docs_with_execution_stats),
             ?TDEF_FE(t_execute_ok_query_view),
             ?TDEF_FE(t_execute_error)
         ]
@@ -997,7 +1032,8 @@ t_execute_ok_all_docs(_) ->
                 fields => Fields,
                 covering_index => undefined
             }},
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
         ],
     Args =
         #mrargs{
@@ -1060,7 +1096,8 @@ t_execute_ok_query_view(_) ->
                 fields => Fields,
                 covering_index => undefined
             }},
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
         ],
     Args =
         #mrargs{
@@ -1084,6 +1121,79 @@ t_execute_ok_query_view(_) ->
     ?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)),
     ?assert(meck:called(fabric, query_view, '_')).
 
+t_execute_ok_all_docs_with_execution_stats(_) ->
+    Bookmark = bookmark,
+    Stats =
+        {[
+            {total_keys_examined, 0},
+            {total_docs_examined, 0},
+            {total_quorum_docs_examined, 0},
+            {results_returned, 0},
+            {execution_time_ms, '_'}
+        ]},
+    UserFnDefinition =
+        [
+            {[{add_key, bookmark, Bookmark}, accumulator], {undefined, updated_accumulator1}},
+            {
+                [{add_key, execution_stats, Stats}, updated_accumulator1],
+                {undefined, updated_accumulator2}
+            }
+        ],
+    meck:expect(foo, bar, UserFnDefinition),
+    Index = #idx{type = <<"json">>, def = all_docs},
+    Selector = {[]},
+    Fields = all_fields,
+    Cursor =
+        #cursor{
+            index = Index,
+            db = db,
+            selector = Selector,
+            fields = Fields,
+            ranges = [{'$gte', start_key, '$lte', end_key}],
+            opts = [{user_ctx, user_ctx}, {execution_stats, true}],
+            bookmark = nil
+        },
+    Cursor1 =
+        Cursor#cursor{
+            user_acc = accumulator,
+            user_fun = fun foo:bar/2,
+            execution_stats = '_'
+        },
+    Cursor2 =
+        Cursor1#cursor{
+            bookmark = Bookmark,
+            bookmark_docid = undefined,
+            bookmark_key = undefined,
+            execution_stats = #execution_stats{executionStartTime = {0, 0, 0}}
+        },
+    Extra =
+        [
+            {callback, {mango_cursor_view, view_cb}},
+            {selector, Selector},
+            {callback_args, #{
+                selector => Selector,
+                fields => Fields,
+                covering_index => undefined
+            }},
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
+        ],
+    Args =
+        #mrargs{
+            view_type = map,
+            reduce = false,
+            start_key = [start_key],
+            end_key = [end_key, ?MAX_JSON_OBJ],
+            include_docs = true,
+            extra = Extra
+        },
+    Parameters = [
+        db, [{user_ctx, user_ctx}], fun mango_cursor_view:handle_all_docs_message/2, Cursor1, Args
+    ],
+    meck:expect(fabric, all_docs, Parameters, meck:val({ok, Cursor2})),
+    ?assertEqual({ok, updated_accumulator2}, execute(Cursor, fun foo:bar/2, accumulator)),
+    ?assert(meck:called(fabric, all_docs, '_')).
+
 t_execute_error(_) ->
     Cursor =
         #cursor{
@@ -1119,7 +1229,8 @@ view_cb_test_() ->
             ?TDEF_FE(t_view_cb_row_matching_covered_doc),
             ?TDEF_FE(t_view_cb_row_non_matching_covered_doc),
             ?TDEF_FE(t_view_cb_row_backwards_compatible),
-            ?TDEF_FE(t_view_cb_complete),
+            ?TDEF_FE(t_view_cb_complete_shard_stats_v1),
+            ?TDEF_FE(t_view_cb_complete_shard_stats_v2),
             ?TDEF_FE(t_view_cb_ok)
         ]
     }.
@@ -1143,7 +1254,7 @@ t_view_cb_row_matching_regular_doc(_) ->
                 }}
             ]
         },
-    put(mango_docs_examined, 0),
+    mango_execution_stats:shard_init(),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')).
 
@@ -1161,7 +1272,7 @@ t_view_cb_row_non_matching_regular_doc(_) ->
                 }}
             ]
         },
-    put(mango_docs_examined, 0),
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
@@ -1179,6 +1290,7 @@ t_view_cb_row_null_doc(_) ->
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
@@ -1197,6 +1309,7 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')).
 
@@ -1222,6 +1335,7 @@ t_view_cb_row_matching_covered_doc(_) ->
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')).
 
@@ -1244,6 +1358,7 @@ t_view_cb_row_non_matching_covered_doc(_) ->
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
@@ -1252,14 +1367,27 @@ t_view_cb_row_backwards_compatible(_) ->
     Row = [{id, id}, {key, key}, {doc, null}],
     meck:expect(rexi, stream2, ['_'], undefined),
     Accumulator = #mrargs{extra = [{selector, {[]}}]},
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
 
-t_view_cb_complete(_) ->
+t_view_cb_complete_shard_stats_v1(_) ->
     meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)),
     meck:expect(rexi, stream_last, [complete], meck:val(ok)),
-    ?assertEqual({ok, accumulator}, view_cb(complete, accumulator)),
+    Accumulator = #mrargs{},
+    mango_execution_stats:shard_init(),
+    ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
+    ?assert(meck:called(rexi, stream2, '_')),
+    ?assert(meck:called(rexi, stream_last, '_')).
+
+t_view_cb_complete_shard_stats_v2(_) ->
+    ShardStats = #{docs_examined => '_', keys_examined => '_'},
+    meck:expect(rexi, stream2, [{execution_stats, ShardStats}], meck:val(ok)),
+    meck:expect(rexi, stream_last, [complete], meck:val(ok)),
+    Accumulator = #mrargs{extra = [{execution_stats_map, true}]},
+    mango_execution_stats:shard_init(),
+    ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')),
     ?assert(meck:called(rexi, stream_last, '_')).
 
@@ -1323,7 +1451,8 @@ handle_message_test_() ->
             ?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_no_match),
             ?TDEF_FE(t_handle_message_row_no_match),
             ?TDEF_FE(t_handle_message_row_error),
-            ?TDEF_FE(t_handle_message_execution_stats),
+            ?TDEF_FE(t_handle_message_execution_stats_v1),
+            ?TDEF_FE(t_handle_message_execution_stats_v2),
             ?TDEF_FE(t_handle_message_complete),
             ?TDEF_FE(t_handle_message_error)
         ]
@@ -1455,7 +1584,7 @@ t_handle_message_row_error(_) ->
     meck:delete(mango_util, defer, 3),
     meck:delete(couch_log, error, 2).
 
-t_handle_message_execution_stats(_) ->
+t_handle_message_execution_stats_v1(_) ->
     ShardStats = {docs_examined, 42},
     ExecutionStats = #execution_stats{totalDocsExamined = 11},
     ExecutionStats1 = #execution_stats{totalDocsExamined = 53},
@@ -1463,6 +1592,14 @@ t_handle_message_execution_stats(_) ->
     Cursor1 = #cursor{execution_stats = ExecutionStats1},
     ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).
 
+t_handle_message_execution_stats_v2(_) ->
+    ShardStats = #{docs_examined => 42, keys_examined => 53},
+    ExecutionStats = #execution_stats{totalDocsExamined = 11, totalKeysExamined = 22},
+    ExecutionStats1 = #execution_stats{totalDocsExamined = 53, totalKeysExamined = 75},
+    Cursor = #cursor{execution_stats = ExecutionStats},
+    Cursor1 = #cursor{execution_stats = ExecutionStats1},
+    ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).
+
 t_handle_message_complete(_) ->
     ?assertEqual({ok, cursor}, handle_message(complete, cursor)).
 
diff --git a/src/mango/src/mango_execution_stats.erl b/src/mango/src/mango_execution_stats.erl
index 66104e89e..350b58bda 100644
--- a/src/mango/src/mango_execution_stats.erl
+++ b/src/mango/src/mango_execution_stats.erl
@@ -15,7 +15,7 @@
 -export([
     to_json/1,
     to_map/1,
-    incr_keys_examined/1,
+    incr_keys_examined/2,
     incr_docs_examined/1,
     incr_docs_examined/2,
     incr_quorum_docs_examined/1,
@@ -23,11 +23,18 @@
     log_start/1,
     log_end/1,
     log_stats/1,
-    maybe_add_stats/4
+    maybe_add_stats/4,
+    shard_init/0,
+    shard_incr_keys_examined/0,
+    shard_incr_docs_examined/0,
+    shard_get_stats/0
 ]).
 
+-include("mango.hrl").
 -include("mango_cursor.hrl").
 
+-define(SHARD_STATS_KEY, mango_shard_execution_stats).
+
 to_json(Stats) ->
     {[
         {total_keys_examined, Stats#execution_stats.totalKeysExamined},
@@ -46,9 +53,9 @@ to_map(Stats) ->
         execution_time_ms => Stats#execution_stats.executionTimeMs
     }.
 
-incr_keys_examined(Stats) ->
+incr_keys_examined(Stats, N) ->
     Stats#execution_stats{
-        totalKeysExamined = Stats#execution_stats.totalKeysExamined + 1
+        totalKeysExamined = Stats#execution_stats.totalKeysExamined + N
     }.
 
 incr_docs_examined(Stats) ->
@@ -106,3 +113,30 @@ log_stats(Stats) ->
     Nonce = list_to_binary(couch_log_util:get_msg_id()),
     MStats1 = MStats0#{nonce => Nonce},
     couch_log:report("mango-stats", MStats1).
+
+-spec shard_init() -> any().
+shard_init() ->
+    InitialState = #{docs_examined => 0, keys_examined => 0},
+    put(?SHARD_STATS_KEY, InitialState).
+
+-spec shard_incr_keys_examined() -> any().
+shard_incr_keys_examined() ->
+    incr(keys_examined).
+
+-spec shard_incr_docs_examined() -> any().
+shard_incr_docs_examined() ->
+    incr(docs_examined).
+
+-spec incr(atom()) -> any().
+incr(Key) ->
+    case get(?SHARD_STATS_KEY) of
+        #{} = Stats0 ->
+            Stats = maps:update_with(Key, fun(X) -> X + 1 end, Stats0),
+            put(?SHARD_STATS_KEY, Stats);
+        _ ->
+            ok
+    end.
+
+-spec shard_get_stats() -> shard_stats_v2().
+shard_get_stats() ->
+    get(?SHARD_STATS_KEY).
diff --git a/src/mango/test/15-execution-stats-test.py b/src/mango/test/15-execution-stats-test.py
index 537a19add..a8f996136 100644
--- a/src/mango/test/15-execution-stats-test.py
+++ b/src/mango/test/15-execution-stats-test.py
@@ -20,7 +20,7 @@ class ExecutionStatsTests(mango.UserDocsTests):
     def test_simple_json_index(self):
         resp = self.db.find({"age": {"$lt": 35}}, return_raw=True, executionStats=True)
         self.assertEqual(len(resp["docs"]), 3)
-        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 3)
         self.assertEqual(resp["execution_stats"]["total_docs_examined"], 3)
         self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 0)
         self.assertEqual(resp["execution_stats"]["results_returned"], 3)
@@ -38,7 +38,7 @@ class ExecutionStatsTests(mango.UserDocsTests):
             {"age": {"$lt": 35}}, return_raw=True, r=3, executionStats=True
         )
         self.assertEqual(len(resp["docs"]), 3)
-        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 3)
         self.assertEqual(resp["execution_stats"]["total_docs_examined"], 0)
         self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 3)
         self.assertEqual(resp["execution_stats"]["results_returned"], 3)
@@ -60,6 +60,19 @@ class ExecutionStatsTests(mango.UserDocsTests):
         self.assertEqual(resp["execution_stats"]["total_docs_examined"], 3)
         self.assertEqual(resp["execution_stats"]["results_returned"], 0)
 
+    def test_covering_json_index(self):
+        resp = self.db.find(
+            {"age": {"$lt": 35}},
+            fields=["_id", "age"],
+            return_raw=True,
+            executionStats=True,
+        )
+        self.assertEqual(len(resp["docs"]), 3)
+        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 3)
+        self.assertEqual(resp["execution_stats"]["total_docs_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["results_returned"], 3)
+
 
 @unittest.skipUnless(mango.has_text_service(), "requires text service")
 class ExecutionStatsTests_Text(mango.UserDocsTextTests):