You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/09/16 17:06:40 UTC
[couchdb] 08/12: Update to use new ebtree multi functions
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch feature-ebtree-views
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 786614d1c1f87d8b61d77096a246eec8d3c62ae3
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Aug 21 13:30:31 2020 -0500
Update to use new ebtree multi functions
---
src/couch_views/src/couch_views_fdb.erl | 183 +++++++++++++++++++---------
src/couch_views/src/couch_views_indexer.erl | 23 ++--
2 files changed, 139 insertions(+), 67 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index a66f138..83b6df5 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -30,7 +30,7 @@
fold_map_idx/5,
fold_red_idx/6,
- write_doc/3,
+ update_views/3,
list_signatures/1,
clear_index/2
@@ -285,62 +285,67 @@ fold_red_idx(TxDb, View, Idx, Options, Callback, Acc0) ->
end.
-write_doc(TxDb, Mrst, #{deleted := true} = Doc) ->
+update_views(TxDb, Mrst, Docs) ->
#{
tx := Tx
} = TxDb,
- #{
- id := DocId
- } = Doc,
-
- ExistingViewKeys = get_view_keys(TxDb, Mrst, DocId),
- ebtree:delete(Tx, Mrst#mrst.id_btree, DocId),
- lists:foreach(fun(#mrview{id_num = ViewId, btree = Btree}) ->
- ViewKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
- {ViewId, Keys} -> Keys;
- false -> []
- end,
- lists:foreach(fun(Key) ->
- ebtree:delete(Tx, Btree, {Key, DocId})
- end, ViewKeys)
- end, Mrst#mrst.views);
+ % Collect update information
-write_doc(TxDb, Mrst, Doc) ->
#{
- tx := Tx
- } = TxDb,
- #{
- id := DocId,
- results := Results
- } = Doc,
+ id_keys_to_remove := IdKeysToRemove,
+ id_rows_to_insert := IdRowsToInsert,
+ view_keys_to_remove := ViewKeysToRemove,
+ view_rows_to_insert := ViewRowsToInsert
+ } = gather_update_info(Tx, Mrst, Docs),
+
+ % Remove any Key from Keys that appears in Rows as a Key
+ DeleteKeys = fun(Keys, Rows) ->
+ lists:foldl(fun({Key, _Val}, KeyAcc) ->
+ lists:delete(Key, KeyAcc)
+ end, Keys, Rows)
+ end,
- ExistingViewKeys = get_view_keys(TxDb, Mrst, DocId),
+ %% couch_log:error("XKCD: IdKeysToRemove: ~p", [IdKeysToRemove]),
+ %% couch_log:error("XKCD: Removing id rows: ~p", [DeleteKeys(IdKeysToRemove, IdRowsToInsert)]),
- NewIdKeys = lists:foldl(fun({View, RawNewRows}, IdKeyAcc) ->
+ % Delete any id rows that won't be overwritten
+ lists:foreach(fun(DocId) ->
+ ebtree:delete(Tx, Mrst#mrst.id_btree, DocId)
+ end, DeleteKeys(IdKeysToRemove, IdRowsToInsert)),
+
+ % Delete all views rows that won't be overwritten
+ lists:foreach(fun(View) ->
#mrview{
- id_num = ViewId
+ id_num = ViewId,
+ btree = BTree
} = View,
- % Remove old keys in the view
- ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
- {ViewId, Keys} -> Keys;
- false -> []
- end,
- lists:foreach(fun(K) ->
- ebtree:delete(Tx, View#mrview.btree, {K, DocId})
- end, ExistingKeys),
+ Keys = maps:get(ViewId, ViewKeysToRemove, []),
+ Rows = maps:get(ViewId, ViewRowsToInsert, []),
+
+ %% couch_log:error("XKCD: ViewKeysToRemove: ~p -> ~p", [ViewId, Keys]),
+ %% couch_log:error("XKCD: Removing view rows: ~p -> ~p", [ViewId, DeleteKeys(Keys, Rows)]),
- % Insert new rows
- NewRows = dedupe_rows(View, RawNewRows),
- lists:foreach(fun({K, V}) ->
- ebtree:insert(Tx, View#mrview.btree, {K, DocId}, V)
- end, NewRows),
- ViewKeys = {View#mrview.id_num, lists:usort([K || {K, _V} <- NewRows])},
- [ViewKeys | IdKeyAcc]
- end, [], lists:zip(Mrst#mrst.views, Results)),
+ lists:foreach(fun(IdKey) ->
+ ebtree:delete(Tx, BTree, IdKey)
+ end, DeleteKeys(Keys, Rows))
+ end, Mrst#mrst.views),
- ebtree:insert(Tx, Mrst#mrst.id_btree, DocId, NewIdKeys).
+ % Insert our new id_btree rows
+ %% couch_log:error("XKCD: Inserting id rows: ~p", [IdRowsToInsert]),
+ ebtree:insert_multi(Tx, Mrst#mrst.id_btree, IdRowsToInsert),
+
+ % Update each view
+ lists:foreach(fun(View) ->
+ #mrview{
+ id_num = ViewId,
+ btree = BTree
+ } = View,
+ Rows = maps:get(ViewId, ViewRowsToInsert, []),
+ %% couch_log:error("XKCD: Inserting view rows: ~p -> ~p", [ViewId, Rows]),
+ ebtree:insert_multi(Tx, BTree, Rows)
+ end, Mrst#mrst.views).
list_signatures(Db) ->
@@ -383,19 +388,6 @@ clear_index(Db, Signature) ->
erlfdb:clear_range_startswith(Tx, TreePrefix).
-get_view_keys(TxDb, Mrst, DocId) ->
- #{
- tx := Tx
- } = TxDb,
- #mrst{
- id_btree = IdTree
- } = Mrst,
- case ebtree:lookup(Tx, IdTree, DocId) of
- {DocId, ViewKeys} -> ViewKeys;
- false -> []
- end.
-
-
open_id_tree(TxDb, Sig) ->
#{
tx := Tx,
@@ -470,11 +462,18 @@ make_reduce_fun(Lang, #mrview{} = View) ->
persist_chunks(Tx, set, [Key, Value]) ->
Chunks = fabric2_fdb:chunkify_binary(Value),
- lists:foldl(fun(Chunk, Id) ->
+ LastId = lists:foldl(fun(Chunk, Id) ->
ChunkKey = erlfdb_tuple:pack({Id}, Key),
erlfdb:set(Tx, ChunkKey, Chunk),
Id + 1
- end, 0, Chunks);
+ end, 0, Chunks),
+
+ % We update nodes in place, so its possible that
+ % a node shrank. This clears any keys that we haven't
+ % just overwritten for the provided key.
+ LastIdKey = erlfdb_tuple:pack({LastId}, Key),
+ EndRange = <<Key/binary, 16#FF>>,
+ erlfdb:clear_range(Tx, LastIdKey, EndRange);
persist_chunks(Tx, get, Key) ->
Rows = erlfdb:get_range_startswith(Tx, Key),
@@ -522,6 +521,72 @@ to_red_opts(Options) ->
{Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun}.
+gather_update_info(Tx, Mrst, Docs) ->
+ DocIds = [DocId || #{id := DocId} <- Docs],
+
+ % ExistingViewKeys is a list of `[{DocId, [{ViewId, [Keys]} | _]} | _]`
+ % This conversion is to a map of `#{ViewId => [{Key, DocId} | _]}`
+ ExistingViewKeys = ebtree:lookup_multi(Tx, Mrst#mrst.id_btree, DocIds),
+ ViewKeysToRemove1 = lists:foldl(fun({DocId, ViewIdKeys}, EVKAcc1) ->
+ lists:foldl(fun({ViewId, Keys}, EVKAcc2) ->
+ ViewKeys = [{Key, DocId} || Key <- Keys],
+ maps:update_with(ViewId, fun(RestViewKeys) ->
+ ViewKeys ++ RestViewKeys
+ end, ViewKeys, EVKAcc2)
+ end, EVKAcc1, ViewIdKeys)
+ end, #{}, ExistingViewKeys),
+
+ % Build our base accumulator
+ InfoAcc1 = #{
+ id_keys_to_remove => DocIds,
+ id_rows_to_insert => [],
+ view_keys_to_remove => ViewKeysToRemove1,
+ view_rows_to_insert => #{}
+ },
+
+ lists:foldl(fun(Doc, InfoAcc2) ->
+ #{
+ id := DocId,
+ deleted := Deleted,
+ results := Results
+ } = Doc,
+
+ if Deleted -> InfoAcc2; true ->
+ Out = lists:foldl(fun({View, RawNewRows}, {IdKeyAcc, InfoAcc3}) ->
+ #mrview{
+ id_num = ViewId
+ } = View,
+ DedupedRows = dedupe_rows(View, RawNewRows),
+
+ IdKeys = lists:usort([K || {K, _V} <- DedupedRows]),
+ ViewRows = [{{K, DocId}, V} || {K, V} <- DedupedRows],
+
+ #{
+ view_rows_to_insert := ViewRowsToInsert2
+ } = InfoAcc3,
+ ViewRowsToInsert3 = maps:update_with(ViewId, fun(Rows) ->
+ ViewRows ++ Rows
+ end, ViewRows, ViewRowsToInsert2),
+
+ {[{ViewId, IdKeys} | IdKeyAcc], InfoAcc3#{
+ view_rows_to_insert := ViewRowsToInsert3
+ }}
+ end, {[], InfoAcc2}, lists:zip(Mrst#mrst.views, Results)),
+
+ {IdRows, InfoAcc4} = Out,
+
+ % Don't store a row in the id_btree if it hasn't got any
+ % keys that will need to be deleted.
+ NonEmptyRows = [1 || {_ViewId, Rows} <- IdRows, Rows /= []],
+ if length(NonEmptyRows) == 0 -> InfoAcc4; true ->
+ maps:update_with(id_rows_to_insert, fun(OtherIdRows) ->
+ [{DocId, IdRows} | OtherIdRows]
+ end, [{DocId, IdRows}], InfoAcc4)
+ end
+ end
+ end, InfoAcc1, Docs).
+
+
dedupe_rows(View, KVs0) ->
CollateFun = couch_views_util:collate_fun(View),
KVs1 = lists:sort(fun({KeyA, ValA}, {KeyB, ValB}) ->
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index ae7e6ff..c88185a 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -102,6 +102,8 @@ init() ->
update_stats => #{}
},
+ process_flag(sensitive, false),
+
try
update(Db, Mrst, State)
catch
@@ -358,7 +360,7 @@ map_docs(Mrst, Docs) ->
end, Docs),
Deleted1 = lists:map(fun(Doc) ->
- Doc#{results => []}
+ Doc#{results => [[] || _ <- Mrst1#mrst.views]}
end, Deleted0),
DocsToMap = lists:map(fun(Doc) ->
@@ -389,7 +391,7 @@ map_docs(Mrst, Docs) ->
{Mrst1, MappedDocs}.
-write_docs(TxDb, Mrst, Docs, State) ->
+write_docs(TxDb, Mrst, Docs0, State) ->
#mrst{
sig = Sig
} = Mrst,
@@ -401,11 +403,12 @@ write_docs(TxDb, Mrst, Docs, State) ->
KeyLimit = key_size_limit(),
ValLimit = value_size_limit(),
- TotalKVCount = lists:foldl(fun(Doc0, KVCount) ->
- Doc1 = calculate_kv_sizes(Mrst, Doc0, KeyLimit, ValLimit),
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1),
- KVCount + count_kvs(Doc1)
- end, 0, Docs),
+ {Docs1, TotalKVCount} = lists:foldl(fun(Doc0, KVCount) ->
+ Doc1 = check_kv_size_limit(Mrst, Doc0, KeyLimit, ValLimit)
+ {Doc1, KVCount + count_kvs(Doc1)}
+ end, 0, Docs0),
+
+ couch_views_fdb:update_views(TxDb, Mrst, Docs1),
if LastSeq == false -> ok; true ->
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
@@ -525,7 +528,11 @@ check_kv_size_limit(Mrst, Doc, KeyLimit, ValLimit) ->
Fmt = "View ~s size error for docid `~s`, excluded from indexing "
"in db `~s` for design doc `~s`",
couch_log:error(Fmt, [Type, DocId, DbName, IdxName]),
- Doc#{deleted := true, results := [], kv_sizes => []}
+ Doc#{
+ deleted := true,
+ results := [[] || _ <- Mrst#mrst.views],
+ kv_sizes => []
+ }
end.