You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/10/21 18:34:39 UTC

[couchdb] 01/01: Chunkify local docs

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch chunkify-local-docs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ef8d621578a3b1adceff9e381aceac5739416ed8
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Oct 21 14:24:27 2019 -0400

    Chunkify local docs
    
    Previously local docs were not chunkified and it was possible for replications
    which checkpointed a few dozen time to created local documents above the 100KB
    limit. So local documents are chunkiefied according to the same scheme as the
    regular docs:
    
        Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id, ChunkId}, DbPrefix)
    
    Where `ChunkId` is just an incrementing integer starting with 0. When documents
    are read from FDB all chunks from the `{?DB_LOCAL_DOCS, Id}` range are read and
    combined into one binary then deserialized as before with `binary_to_term/2`.
    
    We also go to some lengths to read and silently upgrade docs written with the
    old encoding. Upgrades happen on doc writes as a first step, to ensure stats
    update logic is not affected.
---
 src/fabric/src/fabric2_db.erl              | 39 +++++++++++----
 src/fabric/src/fabric2_fdb.erl             | 80 +++++++++++++++++++++++-------
 src/fabric/test/fabric2_doc_crud_tests.erl | 65 ++++++++++++++++++++++++
 3 files changed, 155 insertions(+), 29 deletions(-)

diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 9ef0bd3..8d9ea5b 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -794,18 +794,35 @@ fold_local_docs(Db, UserFun, UserAcc0, Options) ->
 
             UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)),
 
-            UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
-                {DocId} = erlfdb_tuple:unpack(K, Prefix),
-                LDoc = fabric2_fdb:get_local_doc(TxDb, DocId, V),
-                #doc{revs = {Pos, [Rev]}} = LDoc,
-                maybe_stop(UserFun({row, [
-                    {id, DocId},
-                    {key, DocId},
-                    {value, {[{rev, couch_doc:rev_to_str({Pos, Rev})}]}}
-                ]}, Acc))
-            end, UserAcc1, Options),
+            DocCbk = fun
+                (_Tx, nil, [], UAcc) ->
+                    UAcc;
+                (Tx, DocId, Chunks0, UAcc) ->
+                    Chunks = lists:reverse(Chunks0),
+                    LDoc = fabric2_fdb:get_local_doc(Tx, DocId, Chunks),
+                    #doc{revs = {Pos, [Rev]}} = LDoc,
+                    maybe_stop(UserFun({row, [
+                        {id, DocId},
+                        {key, DocId},
+                        {value, {[{rev, couch_doc:rev_to_str({Pos, Rev})}]}}
+                    ]}, UAcc))
+                end,
 
-            {ok, maybe_stop(UserFun(complete, UserAcc2))}
+            {FinId, FinChunks, UserAcc2} = fabric2_fdb:fold_range(TxDb, Prefix,
+                fun({K, V}, {CurId, Chunks, UAcc}) ->
+                    DocId = case erlfdb_tuple:unpack(K, Prefix) of
+                        {Id} -> Id;  % compatibility clause
+                        {Id, N} when is_integer(N), N >= 0 -> Id
+                    end,
+                    case DocId == CurId of
+                        true -> {CurId, [V | Chunks], UAcc};
+                        false -> {DocId, [V], DocCbk(TxDb, CurId, Chunks, UAcc)}
+                    end
+            end, {nil, [], UserAcc1}, Options),
+
+            UserAcc3 = DocCbk(TxDb, FinId, FinChunks, UserAcc2),
+
+            {ok, maybe_stop(UserFun(complete, UserAcc3))}
         catch throw:{stop, FinalUserAcc} ->
             {ok, FinalUserAcc}
         end
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 2ccde1c..829e964 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -518,14 +518,27 @@ get_local_doc(#{} = Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
         db_prefix := DbPrefix
     } = Db = ensure_current(Db0),
 
-    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
-    Val = erlfdb:wait(erlfdb:get(Tx, Key)),
-    fdb_to_local_doc(Db, DocId, Val).
+    Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS}, DbPrefix),
 
+    OldKey = erlfdb_tuple:pack({DocId}, Prefix),
+    case erlfdb:wait(erlfdb:get(Tx, OldKey)) of
+        <<_/binary>> = Val ->
+            % Compatibility clause for a previous encoding scheme
+            fdb_to_local_doc(Db, DocId, [Val]);
+        not_found ->
+            {Start, End} = erlfdb_tuple:range({DocId}, Prefix),
+            Chunks = erlfdb:fold_range(Tx, Start, End, fun({K, V}, Acc) ->
+                % Assert the shape of the encoding is correct
+                {DocId, _} = erlfdb_tuple:unpack(K, Prefix),
+                [V | Acc]
+            end, []),
+            fdb_to_local_doc(Db, DocId, lists:reverse(Chunks))
+    end.
 
-get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Val)
-        when is_binary(Val) orelse Val =:= not_found ->
-    fdb_to_local_doc(ensure_current(Db), DocId, Val).
+
+get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Rows)
+        when is_list(Rows) ->
+    fdb_to_local_doc(ensure_current(Db), DocId, Rows).
 
 
 write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
@@ -647,19 +660,22 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
 
 write_local_doc(#{} = Db0, Doc) ->
     #{
-        tx := Tx
+        tx := Tx,
+        db_prefix := DbPrefix
     } = Db = ensure_current(Db0),
 
-    {LDocKey, LDocVal} = local_doc_to_fdb(Db, Doc),
+    upgrade_local_doc(Db, Doc#doc.id),
 
-    WasDeleted = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
+    [{K0, _} | _] = Rows = local_doc_to_fdb(Db, Doc),
+    WasDeleted = case erlfdb:wait(erlfdb:get(Tx, K0)) of
         <<_/binary>> -> false;
         not_found -> true
     end,
 
+    Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Doc#doc.id}, DbPrefix),
     case Doc#doc.deleted of
-        true -> erlfdb:clear(Tx, LDocKey);
-        false -> erlfdb:set(Tx, LDocKey, LDocVal)
+        true -> erlfdb:clear_range_startswith(Tx, Prefix);
+        false -> lists:foreach(fun({K, V}) -> erlfdb:set(Tx, K, V) end, Rows)
     end,
 
     case {WasDeleted, Doc#doc.deleted} of
@@ -674,6 +690,28 @@ write_local_doc(#{} = Db0, Doc) ->
     ok.
 
 
+% Upgrade local doc from an older encoding. This should be called before local
+% docs are updated. We find the previous local doc and write out using the new
+% encoding.
+upgrade_local_doc(#{} = Db, Id) when is_binary(Id) ->
+    #{
+        tx := Tx,
+        name := DbName,
+        db_prefix := DbPrefix
+    } = Db,
+
+    OldKey = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
+    case erlfdb:wait(erlfdb:get(Tx, OldKey)) of
+        <<_/binary>> = Val ->
+            NewKey = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id, 0}, DbPrefix),
+            erlfdb:clear(Tx, OldKey),
+            erlfdb:set(Tx, NewKey, Val),
+            couch_log:notice("~p : upgraded local doc ~p:~p", [?MODULE, DbName, Id]);
+        not_found ->
+            ok
+    end.
+
+
 read_attachment(#{} = Db, DocId, AttId) ->
     #{
         tx := Tx,
@@ -1071,21 +1109,27 @@ local_doc_to_fdb(Db, #doc{} = Doc) ->
         _ when is_binary(Rev) -> Rev
     end,
 
-    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
-    Val = {StoreRev, Body},
-    {Key, term_to_binary(Val, [{minor_version, 1}])}.
+    Value  = term_to_binary({StoreRev, Body}, [{minor_version, 1}]),
 
+    {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) ->
+        Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id, ChunkId}, DbPrefix),
+        {{Key, Chunk}, ChunkId + 1}
+    end, 0, chunkify_binary(Value)),
+    Rows.
 
-fdb_to_local_doc(_Db, DocId, Bin) when is_binary(Bin) ->
+
+fdb_to_local_doc(_Db, _DocId, []) ->
+    {not_found, missing};
+
+fdb_to_local_doc(_Db, DocId, BinRows) when is_list(BinRows) ->
+    Bin = iolist_to_binary(BinRows),
     {Rev, Body} = binary_to_term(Bin, [safe]),
     #doc{
         id = DocId,
         revs = {0, [Rev]},
         deleted = false,
         body = Body
-    };
-fdb_to_local_doc(_Db, _DocId, not_found) ->
-    {not_found, missing}.
+    }.
 
 
 chunkify_binary(Data) ->
diff --git a/src/fabric/test/fabric2_doc_crud_tests.erl b/src/fabric/test/fabric2_doc_crud_tests.erl
index 3cb3808..5050036 100644
--- a/src/fabric/test/fabric2_doc_crud_tests.erl
+++ b/src/fabric/test/fabric2_doc_crud_tests.erl
@@ -16,6 +16,7 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include("fabric2.hrl").
 
 
 doc_crud_test_() ->
@@ -61,6 +62,8 @@ doc_crud_test_() ->
                 fun recreate_local_doc/1,
                 fun create_local_doc_bad_rev/1,
                 fun create_local_doc_random_rev/1,
+                fun create_large_local_doc/1,
+                fun local_doc_with_previous_encoding/1,
                 fun before_doc_update_skips_local_docs/1
             ]}
         }
@@ -765,6 +768,68 @@ create_local_doc_random_rev({Db, _}) ->
     ?assertEqual(Doc5#doc{revs = {0, [<<"2">>]}}, Doc6).
 
 
+create_large_local_doc({Db, _}) ->
+    UUID = fabric2_util:uuid(),
+    LDocId = <<?LOCAL_DOC_PREFIX, UUID/binary>>,
+    Body = << <<"x">> || _ <- lists:seq(1, 3072) >>,
+    Doc1 = #doc{
+        id = LDocId,
+        revs = {0, []},
+        body = Body
+    },
+    ?assertEqual({ok, {0, <<"1">>}}, fabric2_db:update_doc(Db, Doc1)),
+    {ok, Doc2} = fabric2_db:open_doc(Db, Doc1#doc.id, []),
+    ?assertEqual(Doc1#doc{revs = {0, [<<"1">>]}}, Doc2).
+
+
+local_doc_with_previous_encoding({Db, _}) ->
+    #{db_prefix := DbPrefix} = Db,
+
+    Id = <<"_local/old_doc">>,
+    Body = {[{<<"x">>, 5}]},
+    Rev = <<"1">>,
+    Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        #{tx := Tx} = TxDb,
+        Term = term_to_binary({Rev, Body}, [{minor_version, 1}]),
+        ok = erlfdb:set(Tx, Key, Term)
+    end),
+
+    % Read old doc
+    {ok, Doc1} = fabric2_db:open_doc(Db, Id, []),
+    ?assertEqual({0, [<<"1">>]}, Doc1#doc.revs),
+    ?assertEqual({[{<<"x">>, 5}]}, Doc1#doc.body),
+
+    % Read via fold_local_docs. Normally _local_docs is tested in integration
+    % tests but since we inserted the local docs by hand directly into FDB we
+    % test it here as well
+    {ok, Result} = fabric2_db:fold_local_docs(Db, fun(Data, Acc) ->
+        case Data of
+            {row, [{id, DocId} | _]} when Id =:= DocId ->
+                {ok, [Data | Acc]};
+            _ ->
+                {ok, Acc}
+        end
+    end, [], []),
+    ?assertMatch([{row, [{id, Id} | _]}], Result),
+
+    % Update doc
+    NewBody = {[{<<"y">>, 6}]},
+    Doc2 = Doc1#doc{body = NewBody},
+    ?assertEqual({ok, {0, <<"2">>}}, fabric2_db:update_doc(Db, Doc2)),
+    {ok, Doc3} = fabric2_db:open_doc(Db, Doc2#doc.id, []),
+    ?assertEqual({0, [<<"2">>]}, Doc3#doc.revs),
+    ?assertEqual(NewBody, Doc3#doc.body),
+
+    % Ensure old local doc was cleared
+    OldDocBin = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        #{tx := Tx} = TxDb,
+        erlfdb:wait(erlfdb:get(Tx, Key))
+    end),
+    ?assertEqual(not_found, OldDocBin).
+
+
 before_doc_update_skips_local_docs({Db0, _}) ->
 
     BduFun = fun(Doc, _, _) ->