You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/10/21 18:34:39 UTC
[couchdb] 01/01: Chunkify local docs
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch chunkify-local-docs
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ef8d621578a3b1adceff9e381aceac5739416ed8
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Oct 21 14:24:27 2019 -0400
Chunkify local docs
Previously local docs were not chunkified and it was possible for replications
which checkpointed a few dozen time to created local documents above the 100KB
limit. So local documents are chunkiefied according to the same scheme as the
regular docs:
Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id, ChunkId}, DbPrefix)
Where `ChunkId` is just an incrementing integer starting with 0. When documents
are read from FDB all chunks from the `{?DB_LOCAL_DOCS, Id}` range are read and
combined into one binary then deserialized as before with `binary_to_term/2`.
We also go to some lengths to read and silently upgrade docs written with the
old encoding. Upgrades happen on doc writes as a first step, to ensure stats
update logic is not affected.
---
src/fabric/src/fabric2_db.erl | 39 +++++++++++----
src/fabric/src/fabric2_fdb.erl | 80 +++++++++++++++++++++++-------
src/fabric/test/fabric2_doc_crud_tests.erl | 65 ++++++++++++++++++++++++
3 files changed, 155 insertions(+), 29 deletions(-)
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 9ef0bd3..8d9ea5b 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -794,18 +794,35 @@ fold_local_docs(Db, UserFun, UserAcc0, Options) ->
UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),
- UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
- {DocId} = erlfdb_tuple:unpack(K, Prefix),
- LDoc = fabric2_fdb:get_local_doc(TxDb, DocId, V),
- #doc{revs = {Pos, [Rev]}} = LDoc,
- maybe_stop(UserFun({row, [
- {id, DocId},
- {key, DocId},
- {value, {[{rev, couch_doc:rev_to_str({Pos, Rev})}]}}
- ]}, Acc))
- end, UserAcc1, Options),
+ DocCbk = fun
+ (_Tx, nil, [], UAcc) ->
+ UAcc;
+ (Tx, DocId, Chunks0, UAcc) ->
+ Chunks = lists:reverse(Chunks0),
+ LDoc = fabric2_fdb:get_local_doc(Tx, DocId, Chunks),
+ #doc{revs = {Pos, [Rev]}} = LDoc,
+ maybe_stop(UserFun({row, [
+ {id, DocId},
+ {key, DocId},
+ {value, {[{rev, couch_doc:rev_to_str({Pos, Rev})}]}}
+ ]}, UAcc))
+ end,
- {ok, maybe_stop(UserFun(complete, UserAcc2))}
+ {FinId, FinChunks, UserAcc2} = fabric2_fdb:fold_range(TxDb, Prefix,
+ fun({K, V}, {CurId, Chunks, UAcc}) ->
+ DocId = case erlfdb_tuple:unpack(K, Prefix) of
+ {Id} -> Id; % compatibility clause
+ {Id, N} when is_integer(N), N >= 0 -> Id
+ end,
+ case DocId == CurId of
+ true -> {CurId, [V | Chunks], UAcc};
+ false -> {DocId, [V], DocCbk(TxDb, CurId, Chunks, UAcc)}
+ end
+ end, {nil, [], UserAcc1}, Options),
+
+ UserAcc3 = DocCbk(TxDb, FinId, FinChunks, UserAcc2),
+
+ {ok, maybe_stop(UserFun(complete, UserAcc3))}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 2ccde1c..829e964 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -518,14 +518,27 @@ get_local_doc(#{} = Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
db_prefix := DbPrefix
} = Db = ensure_current(Db0),
- Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
- Val = erlfdb:wait(erlfdb:get(Tx, Key)),
- fdb_to_local_doc(Db, DocId, Val).
+ Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS}, DbPrefix),
+ OldKey = erlfdb_tuple:pack({DocId}, Prefix),
+ case erlfdb:wait(erlfdb:get(Tx, OldKey)) of
+ <<_/binary>> = Val ->
+ % Compatibility clause for a previous encoding scheme
+ fdb_to_local_doc(Db, DocId, [Val]);
+ not_found ->
+ {Start, End} = erlfdb_tuple:range({DocId}, Prefix),
+ Chunks = erlfdb:fold_range(Tx, Start, End, fun({K, V}, Acc) ->
+ % Assert the shape of the encoding is correct
+ {DocId, _} = erlfdb_tuple:unpack(K, Prefix),
+ [V | Acc]
+ end, []),
+ fdb_to_local_doc(Db, DocId, lists:reverse(Chunks))
+ end.
-get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Val)
- when is_binary(Val) orelse Val =:= not_found ->
- fdb_to_local_doc(ensure_current(Db), DocId, Val).
+
+get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Rows)
+ when is_list(Rows) ->
+ fdb_to_local_doc(ensure_current(Db), DocId, Rows).
write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
@@ -647,19 +660,22 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
write_local_doc(#{} = Db0, Doc) ->
#{
- tx := Tx
+ tx := Tx,
+ db_prefix := DbPrefix
} = Db = ensure_current(Db0),
- {LDocKey, LDocVal} = local_doc_to_fdb(Db, Doc),
+ upgrade_local_doc(Db, Doc#doc.id),
- WasDeleted = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
+ [{K0, _} | _] = Rows = local_doc_to_fdb(Db, Doc),
+ WasDeleted = case erlfdb:wait(erlfdb:get(Tx, K0)) of
<<_/binary>> -> false;
not_found -> true
end,
+ Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Doc#doc.id}, DbPrefix),
case Doc#doc.deleted of
- true -> erlfdb:clear(Tx, LDocKey);
- false -> erlfdb:set(Tx, LDocKey, LDocVal)
+ true -> erlfdb:clear_range_startswith(Tx, Prefix);
+ false -> lists:foreach(fun({K, V}) -> erlfdb:set(Tx, K, V) end, Rows)
end,
case {WasDeleted, Doc#doc.deleted} of
@@ -674,6 +690,28 @@ write_local_doc(#{} = Db0, Doc) ->
ok.
+% Upgrade local doc from an older encoding. This should be called before local
+% docs are updated. We find the previous local doc and write out using the new
+% encoding.
+upgrade_local_doc(#{} = Db, Id) when is_binary(Id) ->
+ #{
+ tx := Tx,
+ name := DbName,
+ db_prefix := DbPrefix
+ } = Db,
+
+ OldKey = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, OldKey)) of
+ <<_/binary>> = Val ->
+ NewKey = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id, 0}, DbPrefix),
+ erlfdb:clear(Tx, OldKey),
+ erlfdb:set(Tx, NewKey, Val),
+ couch_log:notice("~p : upgraded local doc ~p:~p", [?MODULE, DbName, Id]);
+ not_found ->
+ ok
+ end.
+
+
read_attachment(#{} = Db, DocId, AttId) ->
#{
tx := Tx,
@@ -1071,21 +1109,27 @@ local_doc_to_fdb(Db, #doc{} = Doc) ->
_ when is_binary(Rev) -> Rev
end,
- Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
- Val = {StoreRev, Body},
- {Key, term_to_binary(Val, [{minor_version, 1}])}.
+ Value = term_to_binary({StoreRev, Body}, [{minor_version, 1}]),
+ {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) ->
+ Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id, ChunkId}, DbPrefix),
+ {{Key, Chunk}, ChunkId + 1}
+ end, 0, chunkify_binary(Value)),
+ Rows.
-fdb_to_local_doc(_Db, DocId, Bin) when is_binary(Bin) ->
+
+fdb_to_local_doc(_Db, _DocId, []) ->
+ {not_found, missing};
+
+fdb_to_local_doc(_Db, DocId, BinRows) when is_list(BinRows) ->
+ Bin = iolist_to_binary(BinRows),
{Rev, Body} = binary_to_term(Bin, [safe]),
#doc{
id = DocId,
revs = {0, [Rev]},
deleted = false,
body = Body
- };
-fdb_to_local_doc(_Db, _DocId, not_found) ->
- {not_found, missing}.
+ }.
chunkify_binary(Data) ->
diff --git a/src/fabric/test/fabric2_doc_crud_tests.erl b/src/fabric/test/fabric2_doc_crud_tests.erl
index 3cb3808..5050036 100644
--- a/src/fabric/test/fabric2_doc_crud_tests.erl
+++ b/src/fabric/test/fabric2_doc_crud_tests.erl
@@ -16,6 +16,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include("fabric2.hrl").
doc_crud_test_() ->
@@ -61,6 +62,8 @@ doc_crud_test_() ->
fun recreate_local_doc/1,
fun create_local_doc_bad_rev/1,
fun create_local_doc_random_rev/1,
+ fun create_large_local_doc/1,
+ fun local_doc_with_previous_encoding/1,
fun before_doc_update_skips_local_docs/1
]}
}
@@ -765,6 +768,68 @@ create_local_doc_random_rev({Db, _}) ->
?assertEqual(Doc5#doc{revs = {0, [<<"2">>]}}, Doc6).
+create_large_local_doc({Db, _}) ->
+ UUID = fabric2_util:uuid(),
+ LDocId = <<?LOCAL_DOC_PREFIX, UUID/binary>>,
+ Body = << <<"x">> || _ <- lists:seq(1, 3072) >>,
+ Doc1 = #doc{
+ id = LDocId,
+ revs = {0, []},
+ body = Body
+ },
+ ?assertEqual({ok, {0, <<"1">>}}, fabric2_db:update_doc(Db, Doc1)),
+ {ok, Doc2} = fabric2_db:open_doc(Db, Doc1#doc.id, []),
+ ?assertEqual(Doc1#doc{revs = {0, [<<"1">>]}}, Doc2).
+
+
+local_doc_with_previous_encoding({Db, _}) ->
+ #{db_prefix := DbPrefix} = Db,
+
+ Id = <<"_local/old_doc">>,
+ Body = {[{<<"x">>, 5}]},
+ Rev = <<"1">>,
+ Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{tx := Tx} = TxDb,
+ Term = term_to_binary({Rev, Body}, [{minor_version, 1}]),
+ ok = erlfdb:set(Tx, Key, Term)
+ end),
+
+ % Read old doc
+ {ok, Doc1} = fabric2_db:open_doc(Db, Id, []),
+ ?assertEqual({0, [<<"1">>]}, Doc1#doc.revs),
+ ?assertEqual({[{<<"x">>, 5}]}, Doc1#doc.body),
+
+ % Read via fold_local_docs. Normally _local_docs is tested in integration
+ % tests but since we inserted the local docs by hand directly into FDB we
+ % test it here as well
+ {ok, Result} = fabric2_db:fold_local_docs(Db, fun(Data, Acc) ->
+ case Data of
+ {row, [{id, DocId} | _]} when Id =:= DocId ->
+ {ok, [Data | Acc]};
+ _ ->
+ {ok, Acc}
+ end
+ end, [], []),
+ ?assertMatch([{row, [{id, Id} | _]}], Result),
+
+ % Update doc
+ NewBody = {[{<<"y">>, 6}]},
+ Doc2 = Doc1#doc{body = NewBody},
+ ?assertEqual({ok, {0, <<"2">>}}, fabric2_db:update_doc(Db, Doc2)),
+ {ok, Doc3} = fabric2_db:open_doc(Db, Doc2#doc.id, []),
+ ?assertEqual({0, [<<"2">>]}, Doc3#doc.revs),
+ ?assertEqual(NewBody, Doc3#doc.body),
+
+ % Ensure old local doc was cleared
+ OldDocBin = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{tx := Tx} = TxDb,
+ erlfdb:wait(erlfdb:get(Tx, Key))
+ end),
+ ?assertEqual(not_found, OldDocBin).
+
+
before_doc_update_skips_local_docs({Db0, _}) ->
BduFun = fun(Doc, _, _) ->