You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/09/16 17:06:40 UTC

[couchdb] 08/12: Update to use new ebtree multi functions

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature-ebtree-views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 786614d1c1f87d8b61d77096a246eec8d3c62ae3
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Aug 21 13:30:31 2020 -0500

    Update to use new ebtree multi functions
---
 src/couch_views/src/couch_views_fdb.erl     | 183 +++++++++++++++++++---------
 src/couch_views/src/couch_views_indexer.erl |  23 ++--
 2 files changed, 139 insertions(+), 67 deletions(-)

diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index a66f138..83b6df5 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -30,7 +30,7 @@
     fold_map_idx/5,
     fold_red_idx/6,
 
-    write_doc/3,
+    update_views/3,
 
     list_signatures/1,
     clear_index/2
@@ -285,62 +285,67 @@ fold_red_idx(TxDb, View, Idx, Options, Callback, Acc0) ->
     end.
 
 
-write_doc(TxDb, Mrst, #{deleted := true} = Doc) ->
+update_views(TxDb, Mrst, Docs) ->
     #{
         tx := Tx
     } = TxDb,
-    #{
-        id := DocId
-    } = Doc,
-
-    ExistingViewKeys = get_view_keys(TxDb, Mrst, DocId),
 
-    ebtree:delete(Tx, Mrst#mrst.id_btree, DocId),
-    lists:foreach(fun(#mrview{id_num = ViewId, btree = Btree}) ->
-        ViewKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
-            {ViewId, Keys} -> Keys;
-            false -> []
-        end,
-        lists:foreach(fun(Key) ->
-            ebtree:delete(Tx, Btree, {Key, DocId})
-        end, ViewKeys)
-    end, Mrst#mrst.views);
+    % Collect update information
 
-write_doc(TxDb, Mrst, Doc) ->
     #{
-        tx := Tx
-    } = TxDb,
-    #{
-        id := DocId,
-        results := Results
-    } = Doc,
+        id_keys_to_remove := IdKeysToRemove,
+        id_rows_to_insert := IdRowsToInsert,
+        view_keys_to_remove := ViewKeysToRemove,
+        view_rows_to_insert := ViewRowsToInsert
+    } = gather_update_info(Tx, Mrst, Docs),
+
+    % Remove any Key from Keys that appears in Rows as a Key
+    DeleteKeys = fun(Keys, Rows) ->
+        lists:foldl(fun({Key, _Val}, KeyAcc) ->
+            lists:delete(Key, KeyAcc)
+        end, Keys, Rows)
+    end,
 
-    ExistingViewKeys = get_view_keys(TxDb, Mrst, DocId),
+    %% couch_log:error("XKCD: IdKeysToRemove: ~p", [IdKeysToRemove]),
+    %% couch_log:error("XKCD: Removing id rows: ~p", [DeleteKeys(IdKeysToRemove, IdRowsToInsert)]),
 
-    NewIdKeys = lists:foldl(fun({View, RawNewRows}, IdKeyAcc) ->
+    % Delete any id rows that won't be overwritten
+    lists:foreach(fun(DocId) ->
+        ebtree:delete(Tx, Mrst#mrst.id_btree, DocId)
+    end, DeleteKeys(IdKeysToRemove, IdRowsToInsert)),
+
+    % Delete all views rows that won't be overwritten
+    lists:foreach(fun(View) ->
         #mrview{
-            id_num = ViewId
+            id_num = ViewId,
+            btree = BTree
         } = View,
 
-        % Remove old keys in the view
-        ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
-            {ViewId, Keys} -> Keys;
-            false -> []
-        end,
-        lists:foreach(fun(K) ->
-            ebtree:delete(Tx, View#mrview.btree, {K, DocId})
-        end, ExistingKeys),
+        Keys = maps:get(ViewId, ViewKeysToRemove, []),
+        Rows = maps:get(ViewId, ViewRowsToInsert, []),
+
+        %% couch_log:error("XKCD: ViewKeysToRemove: ~p -> ~p", [ViewId, Keys]),
+        %% couch_log:error("XKCD: Removing view rows: ~p -> ~p", [ViewId, DeleteKeys(Keys, Rows)]),
 
-        % Insert new rows
-        NewRows = dedupe_rows(View, RawNewRows),
-        lists:foreach(fun({K, V}) ->
-            ebtree:insert(Tx, View#mrview.btree, {K, DocId}, V)
-        end, NewRows),
-        ViewKeys = {View#mrview.id_num, lists:usort([K || {K, _V} <- NewRows])},
-        [ViewKeys | IdKeyAcc]
-    end, [], lists:zip(Mrst#mrst.views, Results)),
+        lists:foreach(fun(IdKey) ->
+            ebtree:delete(Tx, BTree, IdKey)
+        end, DeleteKeys(Keys, Rows))
+    end, Mrst#mrst.views),
 
-    ebtree:insert(Tx, Mrst#mrst.id_btree, DocId, NewIdKeys).
+    % Insert our new id_btree rows
+    %% couch_log:error("XKCD: Inserting id rows: ~p", [IdRowsToInsert]),
+    ebtree:insert_multi(Tx, Mrst#mrst.id_btree, IdRowsToInsert),
+
+    % Update each view
+    lists:foreach(fun(View) ->
+        #mrview{
+            id_num = ViewId,
+            btree = BTree
+        } = View,
+        Rows = maps:get(ViewId, ViewRowsToInsert, []),
+        %% couch_log:error("XKCD: Inserting view rows: ~p -> ~p", [ViewId, Rows]),
+        ebtree:insert_multi(Tx, BTree, Rows)
+    end, Mrst#mrst.views).
 
 
 list_signatures(Db) ->
@@ -383,19 +388,6 @@ clear_index(Db, Signature) ->
     erlfdb:clear_range_startswith(Tx, TreePrefix).
 
 
-get_view_keys(TxDb, Mrst, DocId) ->
-    #{
-        tx := Tx
-    } = TxDb,
-    #mrst{
-        id_btree = IdTree
-    } = Mrst,
-    case ebtree:lookup(Tx, IdTree, DocId) of
-        {DocId, ViewKeys} -> ViewKeys;
-        false -> []
-    end.
-
-
 open_id_tree(TxDb, Sig) ->
     #{
         tx := Tx,
@@ -470,11 +462,18 @@ make_reduce_fun(Lang, #mrview{} = View) ->
 
 persist_chunks(Tx, set, [Key, Value]) ->
     Chunks = fabric2_fdb:chunkify_binary(Value),
-    lists:foldl(fun(Chunk, Id) ->
+    LastId = lists:foldl(fun(Chunk, Id) ->
         ChunkKey = erlfdb_tuple:pack({Id}, Key),
         erlfdb:set(Tx, ChunkKey, Chunk),
         Id + 1
-    end, 0, Chunks);
+    end, 0, Chunks),
+
+    % We update nodes in place, so its possible that
+    % a node shrank. This clears any keys that we haven't
+    % just overwritten for the provided key.
+    LastIdKey = erlfdb_tuple:pack({LastId}, Key),
+    EndRange = <<Key/binary, 16#FF>>,
+    erlfdb:clear_range(Tx, LastIdKey, EndRange);
 
 persist_chunks(Tx, get, Key) ->
     Rows = erlfdb:get_range_startswith(Tx, Key),
@@ -522,6 +521,72 @@ to_red_opts(Options) ->
     {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun}.
 
 
+gather_update_info(Tx, Mrst, Docs) ->
+    DocIds = [DocId || #{id := DocId} <- Docs],
+
+    % ExistingViewKeys is a list of `[{DocId, [{ViewId, [Keys]} | _]} | _]`
+    % This conversion is to a map of `#{ViewId => [{Key, DocId} | _]}`
+    ExistingViewKeys = ebtree:lookup_multi(Tx, Mrst#mrst.id_btree, DocIds),
+    ViewKeysToRemove1 = lists:foldl(fun({DocId, ViewIdKeys}, EVKAcc1) ->
+        lists:foldl(fun({ViewId, Keys}, EVKAcc2) ->
+            ViewKeys = [{Key, DocId} || Key <- Keys],
+            maps:update_with(ViewId, fun(RestViewKeys) ->
+                ViewKeys ++ RestViewKeys
+            end, ViewKeys, EVKAcc2)
+        end, EVKAcc1, ViewIdKeys)
+    end, #{}, ExistingViewKeys),
+
+    % Build our base accumulator
+    InfoAcc1 = #{
+        id_keys_to_remove => DocIds,
+        id_rows_to_insert => [],
+        view_keys_to_remove => ViewKeysToRemove1,
+        view_rows_to_insert => #{}
+    },
+
+    lists:foldl(fun(Doc, InfoAcc2) ->
+        #{
+            id := DocId,
+            deleted := Deleted,
+            results := Results
+        } = Doc,
+
+        if Deleted -> InfoAcc2; true ->
+            Out = lists:foldl(fun({View, RawNewRows}, {IdKeyAcc, InfoAcc3}) ->
+                #mrview{
+                    id_num = ViewId
+                } = View,
+                DedupedRows = dedupe_rows(View, RawNewRows),
+
+                IdKeys = lists:usort([K || {K, _V} <- DedupedRows]),
+                ViewRows = [{{K, DocId}, V} || {K, V} <- DedupedRows],
+
+                #{
+                    view_rows_to_insert := ViewRowsToInsert2
+                } = InfoAcc3,
+                ViewRowsToInsert3 = maps:update_with(ViewId, fun(Rows) ->
+                    ViewRows ++ Rows
+                end, ViewRows, ViewRowsToInsert2),
+
+                {[{ViewId, IdKeys} | IdKeyAcc], InfoAcc3#{
+                    view_rows_to_insert := ViewRowsToInsert3
+                }}
+            end, {[], InfoAcc2}, lists:zip(Mrst#mrst.views, Results)),
+
+            {IdRows, InfoAcc4} = Out,
+
+            % Don't store a row in the id_btree if it hasn't got any
+            % keys that will need to be deleted.
+            NonEmptyRows = [1 || {_ViewId, Rows} <- IdRows, Rows /= []],
+            if length(NonEmptyRows) == 0 -> InfoAcc4; true ->
+                maps:update_with(id_rows_to_insert, fun(OtherIdRows) ->
+                    [{DocId, IdRows} | OtherIdRows]
+                end, [{DocId, IdRows}], InfoAcc4)
+            end
+        end
+    end, InfoAcc1, Docs).
+
+
 dedupe_rows(View, KVs0) ->
     CollateFun = couch_views_util:collate_fun(View),
     KVs1 = lists:sort(fun({KeyA, ValA}, {KeyB, ValB}) ->
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index ae7e6ff..c88185a 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -102,6 +102,8 @@ init() ->
         update_stats => #{}
     },
 
+    process_flag(sensitive, false),
+
     try
         update(Db, Mrst, State)
     catch
@@ -358,7 +360,7 @@ map_docs(Mrst, Docs) ->
     end, Docs),
 
     Deleted1 = lists:map(fun(Doc) ->
-        Doc#{results => []}
+        Doc#{results => [[] || _ <- Mrst1#mrst.views]}
     end, Deleted0),
 
     DocsToMap = lists:map(fun(Doc) ->
@@ -389,7 +391,7 @@ map_docs(Mrst, Docs) ->
     {Mrst1, MappedDocs}.
 
 
-write_docs(TxDb, Mrst, Docs, State) ->
+write_docs(TxDb, Mrst, Docs0, State) ->
     #mrst{
         sig = Sig
     } = Mrst,
@@ -401,11 +403,12 @@ write_docs(TxDb, Mrst, Docs, State) ->
     KeyLimit = key_size_limit(),
     ValLimit = value_size_limit(),
 
-    TotalKVCount = lists:foldl(fun(Doc0, KVCount) ->
-        Doc1 = calculate_kv_sizes(Mrst, Doc0, KeyLimit, ValLimit),
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1),
-        KVCount + count_kvs(Doc1)
-    end, 0, Docs),
+    {Docs1, TotalKVCount} = lists:foldl(fun(Doc0, KVCount) ->
+        Doc1 = check_kv_size_limit(Mrst, Doc0, KeyLimit, ValLimit)
+        {Doc1, KVCount + count_kvs(Doc1)}
+    end, 0, Docs0),
+
+    couch_views_fdb:update_views(TxDb, Mrst, Docs1),
 
     if LastSeq == false -> ok; true ->
         couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
@@ -525,7 +528,11 @@ check_kv_size_limit(Mrst, Doc, KeyLimit, ValLimit) ->
         Fmt = "View ~s size error for docid `~s`, excluded from indexing "
             "in db `~s` for design doc `~s`",
         couch_log:error(Fmt, [Type, DocId, DbName, IdxName]),
-        Doc#{deleted := true, results := [], kv_sizes => []}
+        Doc#{
+            deleted := true,
+            results := [[] || _ <- Mrst#mrst.views],
+            kv_sizes => []
+        }
     end.