You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2023/01/20 17:04:42 UTC

[couchdb] 02/02: Push down field projection in mango to shard

This is an automated email from the ASF dual-hosted git repository.

jan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 00e24b0dfb3435dfb531a755a9a46442936cbaea
Author: Mike Rhodes <mi...@dx13.co.uk>
AuthorDate: Fri Jan 6 11:56:46 2023 +0000

    Push down field projection in mango to shard
    
    This commit aims to improve Mango by reducing the data transferred to
    the coordinator during query execution. It may reduce memory or CPU use
    at the coordinator but that isn't the primary goal.
    
    Currently, when documents are read at the shard level, they are compared
    locally at the shard with the selector to ensure they match before they
    are sent to the coordinator. This ensures we're not sending documents
    across the network that the coordinator immediately discards, saving
    bandwidth and coordinator processing. This commit further executes field
    projection (`fields` in the query) at the shard level. This should
    further save bandwidth, particularly for queries that project few fields
    from large documents.
    
    One item of complexity is that a query may request a quorum read of
    documents, meaning that we need to do the document read at the
    coordinator and not the shard, then perform the `selector` and `fields`
    processing there rather than at the shard. To ensure that documents are
    processed consistently whether at the shard or coordinator,
    match_and_extract_doc/3 is added. There is still one orphan call outside
    match_and_extract_doc/2 to extract/2 which supports cluster upgrade and
    should later be removed.
    
    Shard level processing is already performed in a callback, view_cb/2,
    that's passed to fabric's view processing to run for each row in the
    view result set. It's used for the shard local selector and fields
    processing. To make it clear what arguments are destined for this
    callback, the commit encapsulates the arguments, using viewcbargs_new/2
    and viewcbargs_get/2.
    
    As we push down more functionality to the shard, the context this
    function needs to carry with it will increase, so having a record for it
    will be valuable.
    
    Supporting cluster upgrades:
    
    The commit supports shard pushdown for Mango `fields` processing for
    situations during rolling cluster upgrades.
    
    In the state where the coordinator is speaking to an upgraded node, the
    view_cb/2 needs to support being passed just the `selector` outside of
    the new viewcbargs record. In this case, the shard will not process
    fields, but the coordinator will.
    
    In the situation where the coordinator is upgraded but the shard is not,
    we need to send the selector to the shard via `selector` and also
    execute the fields projection at the coordinator. Therefore we pass
    arguments to view_cb/2 via both `selector` and `callback_args` and have
    an apparently spurious field projection (mango_fields:extract/2) in the
    code that receives back values from the shard ( factored out into
    doc_member_and_extract).
    
    Both of these affordances should only need to exist through one minor
    version change and be removed thereafter -- if people are jumping
    several minor versions of CouchDB in one go, hopefully they are prepared
    for a bit of trouble.
    
    Testing upgrade states:
    
    As view_cb is completely separate from the rest of the cursor code,
    we can first try out the branch's code using view_cb from `main`, and
    then the other way -- the branch's view_cb with the rest of the file
    from main. I did both of these tests successfully.
---
 src/mango/src/mango_cursor_view.erl | 169 +++++++++++++++++++++++++++++++-----
 1 file changed, 146 insertions(+), 23 deletions(-)

diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index 9202ce071..47195341c 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -35,6 +35,19 @@
 
 -define(HEARTBEAT_INTERVAL_IN_USEC, 4000000).
 
+% viewcbargs wraps up the arguments that view_cb uses into a single
+% entry in the mrargs.extra list. We use a Map to allow us to later
+% add fields without having old messages causing errors/crashes.
+viewcbargs_new(Selector, Fields) ->
+    #{
+        selector => Selector,
+        fields => Fields
+    }.
+viewcbargs_get(selector, Args) when is_map(Args) ->
+    maps:get(selector, Args, undefined);
+viewcbargs_get(fields, Args) when is_map(Args) ->
+    maps:get(fields, Args, undefined).
+
 create(Db, Indexes, Selector, Opts) ->
     FieldRanges = mango_idx_view:field_ranges(Selector),
     Composited = composite_indexes(Indexes, FieldRanges),
@@ -100,7 +113,7 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
 maybe_replace_max_json(EndKey) ->
     EndKey.
 
-base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
+base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) ->
     {StartKey, EndKey} =
         case Cursor#cursor.ranges of
             [empty] ->
@@ -118,8 +131,13 @@ base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
         end_key = EndKey,
         include_docs = true,
         extra = [
+            % view_cb pushes down post hoc matching and field extraction to
+            % the shard.
             {callback, {?MODULE, view_cb}},
+            % TODO remove selector. It supports older nodes during version upgrades.
             {selector, Selector},
+            {callback_args, viewcbargs_new(Selector, Fields)},
+
             {ignore_partition_query_limit, true}
         ]
     }.
@@ -248,6 +266,19 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
         key = couch_util:get_value(key, Row),
         doc = couch_util:get_value(doc, Row)
     },
+    % This supports receiving our "arguments" either as just the `selector`
+    % or in the new record in `callback_args`. This is to support mid-upgrade
+    % clusters where the non-upgraded coordinator nodes will send the older style.
+    % TODO remove this in a couple of couchdb versions.
+    {Selector, Fields} =
+        case couch_util:get_value(callback_args, Options) of
+            % old style
+            undefined ->
+                {couch_util:get_value(selector, Options), undefined};
+            % new style - assume a viewcbargs
+            Args = #{} ->
+                {viewcbargs_get(selector, Args), viewcbargs_get(fields, Args)}
+        end,
     case ViewRow#view_row.doc of
         null ->
             maybe_send_mango_ping();
@@ -256,14 +287,18 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
             ok = rexi:stream2(ViewRow),
             set_mango_msg_timestamp();
         Doc ->
+            % We slightly abuse the doc field in the view response here,
+            % because we may return something other than the full document:
+            % we may have projected the requested `fields` from the query.
+            % However, this oddness is confined to being visible in this module.
             put(mango_docs_examined, get(mango_docs_examined) + 1),
-            Selector = couch_util:get_value(selector, Options),
             couch_stats:increment_counter([mango, docs_examined]),
-            case mango_selector:match(Selector, Doc) of
-                true ->
-                    ok = rexi:stream2(ViewRow),
+            case match_and_extract_doc(Doc, Selector, Fields) of
+                {match, FinalDoc} ->
+                    FinalViewRow = ViewRow#view_row{doc = FinalDoc},
+                    ok = rexi:stream2(FinalViewRow),
                     set_mango_msg_timestamp();
-                false ->
+                {no_match, undefined} ->
                     maybe_send_mango_ping()
             end
     end,
@@ -277,6 +312,22 @@ view_cb(complete, Acc) ->
 view_cb(ok, ddoc_updated) ->
     rexi:reply({ok, ddoc_updated}).
 
+%% match_and_extract_doc checks whether Doc matches Selector. If it does,
+%% extract Fields and return {match, FinalDoc}; otherwise return {no_match, undefined}.
+-spec match_and_extract_doc(
+    Doc :: term(),
+    Selector :: term(),
+    Fields :: [string()] | undefined | all_fields
+) -> {match | no_match, term() | undefined}.
+match_and_extract_doc(Doc, Selector, Fields) ->
+    case mango_selector:match(Selector, Doc) of
+        true ->
+            FinalDoc = mango_fields:extract(Doc, Fields),
+            {match, FinalDoc};
+        false ->
+            {no_match, undefined}
+    end.
+
 maybe_send_mango_ping() ->
     Current = os:timestamp(),
     LastPing = get(mango_last_msg_timestamp),
@@ -296,14 +347,13 @@ set_mango_msg_timestamp() ->
 handle_message({meta, _}, Cursor) ->
     {ok, Cursor};
 handle_message({row, Props}, Cursor) ->
-    case doc_member(Cursor, Props) of
+    case doc_member_and_extract(Cursor, Props) of
         {ok, Doc, {execution_stats, Stats}} ->
             Cursor1 = Cursor#cursor{
                 execution_stats = Stats
             },
             Cursor2 = update_bookmark_keys(Cursor1, Props),
-            FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
-            handle_doc(Cursor2, FinalDoc);
+            handle_doc(Cursor2, Doc);
         {no_match, _, {execution_stats, Stats}} ->
             Cursor1 = Cursor#cursor{
                 execution_stats = Stats
@@ -422,16 +472,21 @@ apply_opts([{_, _} | Rest], Args) ->
     % Ignore unknown options
     apply_opts(Rest, Args).
 
-doc_member(Cursor, RowProps) ->
+doc_member_and_extract(Cursor, RowProps) ->
     Db = Cursor#cursor.db,
     Opts = Cursor#cursor.opts,
     ExecutionStats = Cursor#cursor.execution_stats,
     Selector = Cursor#cursor.selector,
     case couch_util:get_value(doc, RowProps) of
         {DocProps} ->
-            % only matching documents are returned; the selector
-            % is evaluated at the shard level in view_cb({row, Row},
-            {ok, {DocProps}, {execution_stats, ExecutionStats}};
+            % If the query doesn't request quorum doc read via r>1,
+            % match_and_extract_doc/3 is executed in view_cb, ie, locally
+            % on the shard. We only receive back the final result for the query.
+            % TODO during upgrade, some nodes will not be processing `fields`
+            % on the shard because they're old, so re-execute here just in case.
+            % Remove this later, same time as the duplicate extract at the coordinator.
+            DocProps2 = mango_fields:extract({DocProps}, Cursor#cursor.fields),
+            {ok, DocProps2, {execution_stats, ExecutionStats}};
         undefined ->
             % an undefined doc was returned, indicating we should
             % perform a quorum fetch
@@ -441,7 +496,12 @@ doc_member(Cursor, RowProps) ->
             case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
                 {ok, #doc{} = DocProps} ->
                     Doc = couch_doc:to_json_obj(DocProps, []),
-                    match_doc(Selector, Doc, ExecutionStats1);
+                    case match_and_extract_doc(Doc, Selector, Cursor#cursor.fields) of
+                        {match, FinalDoc} ->
+                            {ok, FinalDoc, {execution_stats, ExecutionStats1}};
+                        {no_match, undefined} ->
+                            {no_match, Doc, {execution_stats, ExecutionStats1}}
+                    end;
                 Else ->
                     Else
             end;
@@ -450,14 +510,6 @@ doc_member(Cursor, RowProps) ->
             {no_match, null, {execution_stats, ExecutionStats}}
     end.
 
-match_doc(Selector, Doc, ExecutionStats) ->
-    case mango_selector:match(Selector, Doc) of
-        true ->
-            {ok, Doc, {execution_stats, ExecutionStats}};
-        false ->
-            {no_match, Doc, {execution_stats, ExecutionStats}}
-    end.
-
 is_design_doc(RowProps) ->
     case couch_util:get_value(id, RowProps) of
         <<"_design/", _/binary>> -> true;
@@ -479,6 +531,8 @@ update_bookmark_keys(Cursor, _Props) ->
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 
+%% Test the doc_member_and_extract bypasses the selector check if it receives
+%% a document in RowProps.doc.
 does_not_refetch_doc_with_value_test() ->
     Cursor = #cursor{
         db = <<"db">>,
@@ -498,7 +552,76 @@ does_not_refetch_doc_with_value_test() ->
                 ]
             }}
     ],
-    {Match, _, _} = doc_member(Cursor, RowProps),
+    {Match, _, _} = doc_member_and_extract(Cursor, RowProps),
     ?assertEqual(Match, ok).
 
+%% Test that field filtering is duplicated in doc_member_and_extract even when
+%% returning a value via RowProps.doc (ie, should have been done on the shard).
+%% This is needed temporarily for mixed version upgrades, as some shards may
+%% not have performed the field extraction. This can be later removed.
+doc_member_and_extract_fields_test() ->
+    Cursor = #cursor{
+        db = <<"db">>,
+        opts = [],
+        execution_stats = #execution_stats{},
+        %% no selector here as we should be bypassing this in the case of
+        %% shard level selector application.
+        fields = [<<"user_id">>, <<"a_non_existent_field">>]
+    },
+    RowProps = [
+        {id, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
+        {key, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
+        {doc,
+            {
+                [
+                    {<<"_id">>, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
+                    {<<"_rev">>, <<"1-a954fe2308f14307756067b0e18c2968">>},
+                    {<<"user_id">>, 11}
+                ]
+            }}
+    ],
+    {Match, Doc, _} = doc_member_and_extract(Cursor, RowProps),
+    ?assertEqual(ok, Match),
+    ?assertEqual({[{<<"user_id">>, 11}]}, Doc).
+
+%% match_and_extract_doc should return full Doc when Doc matches Selector and
+%% Fields is undefined.
+match_and_extract_doc_match_test() ->
+    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+    Selector = mango_selector:normalize({[{<<"user_id">>, 11}]}),
+    Fields = undefined,
+    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+    ?assertEqual(match, Match),
+    ?assertEqual(Doc, FinalDoc).
+
+%% match_and_extract_doc should return projected Doc when Doc matches Selector
+%% and Fields is a list of fields.
+match_and_extract_doc_matchextract_test() ->
+    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+    Selector = mango_selector:normalize({[{<<"user_id">>, 11}]}),
+    Fields = [<<"_id">>, <<"user_id">>],
+    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+    ?assertEqual(match, Match),
+    ?assertEqual({[{<<"_id">>, <<"myid">>}, {<<"user_id">>, 11}]}, FinalDoc).
+
+%% match_and_extract_doc should return no document when Doc does not match
+%% Selector.
+match_and_extract_doc_nomatch_test() ->
+    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+    Selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]}),
+    Fields = undefined,
+    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+    ?assertEqual(no_match, Match),
+    ?assertEqual(undefined, FinalDoc).
+
+%% match_and_extract_doc should return no document when Doc does not match
+%% Selector even if Fields is defined.
+match_and_extract_doc_nomatch_fields_test() ->
+    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
+    Selector = mango_selector:normalize({[{<<"user_id">>, 1234}]}),
+    Fields = [<<"_id">>, <<"user_id">>],
+    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
+    ?assertEqual(no_match, Match),
+    ?assertEqual(undefined, FinalDoc).
+
 -endif.