You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 21:54:58 UTC

[couchdb] 13/31: Move fdb logic to couch_views_fdb

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5d6068e004f1976191c9327a97d9b33c452e64f3
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Jul 18 13:58:57 2019 -0500

    Move fdb logic to couch_views_fdb
---
 src/couch_views/src/couch_views.erl        |  45 +-----
 src/couch_views/src/couch_views_fdb.erl    | 147 +++++++++++++++--
 src/couch_views/src/couch_views_reader.erl | 250 ++++++++++++-----------------
 3 files changed, 245 insertions(+), 197 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 65af1bf..7deb54d 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -43,23 +43,22 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
         false -> ok
     end,
 
-    Args = mrargs_to_map(QueryArgs2),
-    ok = maybe_update_view(Db, Mrst, Args),
+    ok = maybe_update_view(Db, Mrst, QueryArgs2),
 
     try
-        couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args)
+        couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, QueryArgs2)
     after
-        UpdateAfter = maps:get(update, Args) == lazy,
+        UpdateAfter = QueryArgs2#mrargs.update == lazy,
         if UpdateAfter == false -> ok; true ->
             couch_views_jobs:build_view_async(Db, Mrst)
         end
     end.
 
 
-maybe_update_view(_Db, _Mrst, #{update := false}) ->
+maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
     ok;
 
-maybe_update_view(_Db, _Mrst, #{update := lazy}) ->
+maybe_update_view(_Db, _Mrst, #mrargs{update = laze}) ->
     ok;
 
 maybe_update_view(Db, Mrst, _Args) ->
@@ -83,39 +82,5 @@ is_reduce_view({Reduce, _, _}) ->
     Reduce =:= red.
 
 
-mrargs_to_map(#mrargs{} = Args) ->
-    process_args(#{
-        start_key => Args#mrargs.start_key,
-        start_key_docid => Args#mrargs.start_key_docid,
-        end_key => Args#mrargs.end_key,
-        end_key_docid => Args#mrargs.end_key_docid,
-        keys => Args#mrargs.keys,
-        direction => Args#mrargs.direction,
-        limit => Args#mrargs.limit,
-        skip => Args#mrargs.skip,
-        update => Args#mrargs.update,
-        multi_get => Args#mrargs.multi_get,
-        inclusive_end => Args#mrargs.inclusive_end,
-        include_docs => Args#mrargs.include_docs,
-        doc_options => Args#mrargs.doc_options,
-        update_seq => Args#mrargs.update_seq,
-        conflicts => Args#mrargs.conflicts,
-        sorted => Args#mrargs.sorted
-    }).
-
-
-process_args(#{} = Args) ->
-    Args1 = remove_ununsed_values(Args),
-    Defaults = #{
-            direction => fwd,
-            inclusive_end => true,
-            update => true,
-            skip => 0,
-            limit => ?MAX_VIEW_LIMIT
-        },
-
-    maps:merge(Defaults, Args1).
-
-
 remove_ununsed_values(Args) ->
     maps:filter(fun (_, V) -> V /= undefined end, Args).
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index f47f1b1..57ed5f1 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -16,7 +16,9 @@
     get_update_seq/2,
     set_update_seq/3,
 
-    write_rows/4
+    fold_map_idx/5,
+
+    write_doc/4
 ]).
 
 
@@ -54,18 +56,54 @@ set_view_seq(TxDb, Sig, Seq) ->
     ok = erlfdb:set(Tx, SeqKey, Seq).
 
 
+fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
+    {Fun, Acc} = case fabric2_util:get_value(dir, Options, fwd) of
+        fwd ->
+            FwdAcc = #{
+                prefix => MapIdxPrefix,
+                next => key,
+                key => undefined,
+                sort_key => undefined,
+                docid => undefined,
+                dupe_id => undefined,
+                callback => Callback,
+                acc => Acc0,
+            },
+            {fun fold_fwd/2, FwdAcc}
+        rev ->
+            RevAcc #{
+                prefix => MapIdxPrefix,
+                next => value,
+                value => undefined,
+                sort_key => undefined,
+                docid => undefined,
+                dupe_id => undefined,
+                callback => Callback,
+                acc => Acc0
+            },
+            {fun fold_rev/2, RevAcc}
+    end,
+
+    fabric2_db:fold_range(TxDb, MapIdxPrefix, Fun, Acc, Options).
+
+
 write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) ->
     #{
         id := DocId
     } = Doc,
 
-    ViewKeys = get_view_keys(TxDb, Sig, DocId),
+    ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
 
-    clear_id_idx(TxDb, Sig, DocId),
+    sclear_id_idx(TxDb, Sig, DocId),
     lists:foreach(fun({ViewId, ViewKeys}) ->
         clear_map_idx(TxDb, Sig, ViewId, ViewKeys)
-    end, ViewKeys).
-
+    end, ExistingViewKeys);
 
 write_doc(TxDb, Sig, Doc, ViewIds) ->
     #{
@@ -79,7 +117,7 @@ write_doc(TxDb, Sig, Doc, ViewIds) ->
 
     ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
 
-    ok = clear_id_idx(TxDb, Sig, DocId),
+    clear_id_idx(TxDb, Sig, DocId),
 
     lists:foreach(fun({ViewId, NewRows}) ->
         ExistingKeys = fabric2_util:get_value(ViewId, ExistingViewKeys, []),
@@ -88,6 +126,95 @@ write_doc(TxDb, Sig, Doc, ViewIds) ->
     end, lists:zip(ViewIds, Results)).
 
 
+fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+    #{
+        prefix := Prefix
+    } = Acc,
+
+    {{SortKey, DocId}, _DupeId, ?VIEW_ROW_KEY} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+    Acc#{
+        next := val,
+        key := couch_views_encoding:decode(EncodedOriginalKey),
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DupeId
+    };
+
+fold_fwd({RowKey, EncodedValue}, #{next := val} = Acc) ->
+    #{
+        prefix := Prefix,
+        key := Key,
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DocId,
+        callback := UserCallback
+        acc := UserAcc0
+    } = Acc,
+
+    % We're asserting there that this row is paired
+    % correctly with the previous row by relying on
+    % a badmatch if any of these values don't match.
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_VAL} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+
+    Value = couch_views_encoding:decode(EncodedValue),
+    NewAcc = UserCallback(DocId, Key, Value, UserAcc0),
+
+    #{
+        next := key,
+        key := undefined,
+        sort_key := undefined,
+        docid := undefined,
+        dupe_id := undefined,
+        acc := UserAcc1
+    }.
+
+
+fold_rev({RowKey, EncodedValue}, #{next := value} = Acc) ->
+    #{
+        prefix := Prefix
+    } = Acc,
+
+    {{SortKey, DocId}, _DupeId, ?VIEW_ROW_VAL} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+    Acc#{
+        next := key,
+        value := couch_views_encoding:decode(EncodedValue),
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DupeId
+    };
+
+fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+    #{
+        prefix := Prefix,
+        value := Value,
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DocId,
+        callback := UserCallback
+        acc := UserAcc0
+    } = Acc,
+
+    % We're asserting there that this row is paired
+    % correctly with the previous row by relying on
+    % a badmatch if any of these values don't match.
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+
+    Key = couch_views_encoding:decode(EncodedOriginalKey),
+    NewAcc = UserCallback(DocId, Key, Value, UserAcc0),
+
+    #{
+        next := val,
+        value := undefined,
+        sort_key := undefined,
+        docid := undefined,
+        dupe_id := undefined,
+        acc := UserAcc1
+    }.
+
 clear_id_idx(TxDb, Sig, DocId) ->
     #{
         tx := Tx,
@@ -141,10 +268,10 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
     MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
 
     lists:foreach(fun({DupeId, Key1, Key2, Val}) ->
-        KeyKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_KEY),
-        ValKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_VAL),
-        ok = erlfdn:store(Tx, KeyKey, Key2),
-        ok = erlfdb:store(Tx, ValKey, Val)
+        KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_KEY),
+        VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VAL),
+        ok = erlfdn:store(Tx, KK, Key2),
+        ok = erlfdb:store(Tx, VK, Val)
     end, KVsToAdd).
 
 
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index f4e768a..56b23f2 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -22,174 +22,130 @@
 -include_lib("fabric/src/fabric2.hrl").
 
 
-read(Db, DDoc, ViewName, Callback, Acc0, Args) ->
-    #{name := DbName} = Db,
-
-    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     #mrst{
         sig = Sig,
         views = Views
     } = Mrst,
 
-    IdxName = get_idx_name(ViewName, Views),
-    State0 = #{
-        acc => Acc0,
-        skip => maps:get(skip, Args, 0),
-        include_docs => maps:get(include_docs, Args, false),
-        db => Db
-    },
+    ViewId = get_view_id(ViewName, Views),
+    Opts = mrargs_to_fdb_options(Args),
+    Fun = fun handle_row/4,
+
+    try
+        % Need to add total_rows support
+        Meta = {meta, [{total_rows, null}, {offset, null}]},
+        UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+        fabric2_fdb:transactional(Db, fun(TxDb) ->
+            Acc0 = #{
+                db => TxDb
+                skip => Args#mrargs.skip,
+                mrargs => Args,
+                callback => UserCallback,
+                acc => UserAcc1
+            },
+
+            Acc1 = couch_views_fdb:fold_map_idx(
+                    TxDb,
+                    Sig,
+                    ViewId,
+                    Opts,
+                    Fun,
+                    Acc0
+                ),
+
+            #{
+                acc := UserAcc2
+            } = Acc1,
+
+            maybe_stop(Callback(complete, UserAcc2)
+        end)
+    catch throw:{done, Out} ->
+        {ok, Out}
+    end.
 
-    DefaultOpts = [{streaming_mode, want_all}],
-    {Start, End, QueryOpts} = convert_args_to_fdb(Db, Sig, IdxName, Args,
-        DefaultOpts),
-    Opts = QueryOpts ++ DefaultOpts,
 
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        Future = couch_views_fdb:get_map_range(TxDb, Start, End, Opts),
+handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+    {ok, Acc#{skip := Skip - 1}};
 
-        UnPack = get_unpack_fun(TxDb, Opts, Callback),
-        State1 = lists:foldl(UnPack, State0, erlfdb:wait(Future)),
+handle_row(DocId, Key, Value, Acc) ->
+    #{
+        db := TxDb
+        mrargs := Args,
+        callback := UserCallback,
+        acc := UserAcc
+    } = Acc,
+
+    BaseRow = [
+        {id, DocId},
+        {key, Key},
+        {value, Value}
+    ],
+
+    Row = BaseRow ++ if not IncludeDocs -> []; true ->
+        DocOpts0 = Args#mrargs.doc_options,
+        DocOpts1 = OpenOpts0 ++ case Args#mrargs.conflicts of
+            true -> [conflicts];
+            false -> []
+        end,
+        DocObj = case fabric2_db:open_doc(Db, DocId, DocOpts1) of
+            {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts1);
+            {not_found, _} -> null
+        end,
+        [{doc, DocObj}]
+    end,
 
-        #{acc := Acc1} = State1,
-        Callback(complete, Acc1)
-    end).
+    UserAcc1 = maybe_stop(Callback({row, Row1}, UserAcc0)),
+    Acc#{acc := UserAcc1}
 
 
-get_idx_name(ViewName, Views) ->
-    {value, View} = lists:search(fun (View) ->
+get_view_id(ViewName, Views) ->
+    {value, View} = lists:search(fun(View) ->
         lists:member(ViewName, View#mrview.map_names)
     end, Views),
     View#mrview.id_num.
 
 
-convert_args_to_fdb(Db, Sig, IdxName, Args, Opts) ->
-    #{
-        direction := Direction
-    } = Args,
-
-    {Start1, End1} = get_range_keys(Db, Sig, IdxName, Args),
-
-    Opts1 = case maps:is_key(limit, Args) of
-        false ->
-            Opts;
-        true ->
-            Skip = maps:get(skip, Args, 0),
-            Limit = maps:get(limit, Args),
-            % Limit is multiplied by two because there are two rows per key
-            % value.
-            % Skip is added because that is done in the fold so we need
-            % to fetch the number of documents
-            % along with the docs we would skip.
-            % Limit = (Doc limit + Skip) * Num of Rows per Map KV
-            [{limit, (Limit + Skip) * 2} | Opts]
-    end,
-
-    Opts2 = case Direction of
-        fwd ->
-            Opts1;
-        rev ->
-            [{reverse, true} | Opts1]
-    end,
-    {Start1, End1, Opts2}.
-
-
-get_range_keys(Db, Sig, IdxName, Args) ->
-    #{
-        inclusive_end := InclusiveEnd,
-        direction := Direction
+mrargs_to_fdb_options(Args) ->
+    #mrargs{
+        start_key = StartKey,
+        start_key_docid = StartKeyDocId,
+        end_key = EndKey,
+        end_key_docid = EndKeyDocId,
+        direction = Direction,
+        limit = Limit,
+        inclusive_end = InclusiveEnd
     } = Args,
 
-    {MapStartKey, MapEndKey} = case Direction of
-        fwd -> {start_key, end_key};
-        rev -> {end_key, start_key}
+    StartKeyOpts = case {StartKey, StartKeyDocId} of
+        {undefined, _} ->
+            [];
+        {StartKey, undefined} ->
+            [{start_key, {StartKey}}];
+        {_, _} ->
+            [{start_key, {StartKey, StartKeyDocId}}]
     end,
 
-    {Start0, End0} = couch_views_fdb:get_map_range_keys(Db, Sig, IdxName),
-
-    Start1 = case maps:is_key(MapStartKey, Args) of
-        false ->
-            Start0;
-        true ->
-            StartKey = maps:get(MapStartKey, Args),
-            Start = couch_views_fdb:get_map_index_key(Db, Sig, IdxName,
-                StartKey),
-            erlfdb_key:first_greater_or_equal(Start)
-    end,
-
-    End1 = case maps:is_key(MapEndKey, Args) of
-        false ->
-            End0;
-        true ->
-            EndKey = maps:get(MapEndKey, Args),
-            EndBin = couch_views_fdb:get_map_index_key(Db, Sig, IdxName,
-                EndKey),
-            EndBin1 = case InclusiveEnd of
-                true -> <<EndBin/binary, 16#FF>>;
-                false -> EndBin
-            end,
-            erlfdb_key:first_greater_than(EndBin1)
-    end,
-    {Start1, End1}.
-
-
-get_unpack_fun(TxDb, Opts, Callback) ->
-    UnPackFwd = fun({K, V}, State) ->
-        case couch_views_fdb:unpack_map_row(TxDb, K, V) of
-            {key, _Id, RowKey} ->
-                State#{current_key => RowKey};
-            {value, Id, RowValue} ->
-                #{
-                    current_key := RowKey
-                } = State,
-                process_map_row(Id, RowKey, RowValue, State, Callback)
-        end
+    EndKeyOpts = case {EndKey, EndKeyDocId} of
+        {undefined, _} ->
+            [];
+        {EndKey, undefined} when InclusiveEnd ->
+            [{end_key, {EndKey}}];
+        {EndKey, undefined} ->
+            [{end_key_gt, {EndKey}}];
+        {EndKey, EndKeyDocId} when InclusiveEnd ->
+            [{end_key, {EndKey, EndKeyDocId}}];
+        {EndKey, EndKeyDocId} ->
+            [{end_key_gt, {EndKey, EndKeyDocId}}]
     end,
 
-    UnPackRev = fun({K, V}, State) ->
-        case couch_views_fdb:unpack_map_row(TxDb, K, V) of
-            {key, Id, RowKey} ->
-                #{
-                    current_value := RowValue
-                } = State,
-                process_map_row(Id, RowKey, RowValue, State, Callback);
-            {value, _Id, RowValue} ->
-                State#{current_value => RowValue}
-        end
-    end,
-
-    case lists:keyfind(reverse, 1, Opts) of
-        {reverse, true} -> UnPackRev;
-        _ -> UnPackFwd
-    end.
+    [
+        {dir, Direction},
+        {limit, Limit * 2},
+        {streaming_mode, want_all}
+    ] ++ StartKeyOpts ++ EndKeyOpts.
 
 
-process_map_row(Id, RowKey, RowValue, State, Callback) ->
-    #{
-        acc := Acc,
-        skip := Skip,
-        db := Db
-    } = State,
-
-    case Skip > 0 of
-        true ->
-            State#{skip := Skip -1};
-        false ->
-            Row = [{id, Id}, {key, RowKey}, {value, RowValue}],
-
-            IncludeDoc = maps:get(include_docs, State, false),
-            Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc),
-
-            {ok, AccNext} = Callback({row, Row1}, Acc),
-            State#{acc := AccNext}
-    end.
-
-
-maybe_include_doc(_Db, _Id, Row, false) ->
-    Row;
-
-maybe_include_doc(Db, Id, Row, true) ->
-    Doc1 = case fabric2_db:open_doc(Db, Id) of
-        {ok, Doc} -> couch_doc:to_json_obj(Doc, []);
-        {not_found, _} -> []
-    end,
-    Row ++ [{doc, Doc1}].
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).