You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/03/08 22:35:53 UTC

[couchdb] branch prototype/fdb-layer updated: Implement update_docs

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
     new 73ce010  Implement update_docs
73ce010 is described below

commit 73ce010c0fc20d9fa84eec7cf39a3c81e7ba73e0
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Mar 8 16:34:43 2019 -0600

    Implement update_docs
    
    This is a first pass attempt at an implementation of update_docs.
    There's probably a lot long here and a number of things are expected to
    blow up if features like VDUs are used.
---
 FDB_NOTES.md                   |  45 +++++
 src/chttpd/src/chttpd_db.erl   |   9 +-
 src/fabric/src/fabric2.erl     | 237 ++++++++++++++---------
 src/fabric/src/fabric2_db.erl  | 162 ++++++++++++++++
 src/fabric/src/fabric2_doc.erl | 420 +++++++++++++++++++++++++++++++++++++++++
 5 files changed, 778 insertions(+), 95 deletions(-)

diff --git a/FDB_NOTES.md b/FDB_NOTES.md
new file mode 100644
index 0000000..9278a2f
--- /dev/null
+++ b/FDB_NOTES.md
@@ -0,0 +1,45 @@
+Things of Note
+===
+
+
+1. If a replication sends us two revisions A and B where one is an
+   ancestor of the other, we likely have divergent behavior. However,
+   this should never happen In Theory.
+
+2. Multiple updates to the same document in a _bulk_docs (or if they
+   just happen to be in the same update batch in non-fdb CouchDB)
+   we likely have subtly different behavior.
+
+3. I'm relying on repeated reads in an fdb transaction to be "cheap"
+   in that the reads would be cached in the fdb_transaction object.
+   This needs to be checked for certainty but that appeared to
+   be how things behaved in testing.
+
+4. When attempting to create a doc from scratch in an interacitve_edit
+   update, with revisions specified *and* attachment stubs, the reported
+   error is now a conflict. Previously the missing_stubs error was
+   raised earlier.
+
+5. There may be a difference in behavior if a) there are no VDU functions
+   set on a db and no design documents in a batch. This is because in
+   this situation we don't run the prep_and_validate code on pre-fdb
+   CouchDB. The new code always checks stubs before merging revision trees.
+   I'm sure the old way would fail somehow, but it would fail further on
+   which means we may have failed with a different reason (conflict, etc)
+   before we got to the next place we check for missing stubs.
+
+6. I still need add paging for the DirectoryLayer results so that we
+   can use that for the _all_dbs end point.
+
+7. For multi-doc updates we'll need to investigate user versions on
+   versionstamps within a transaction. Also this likely prevents the
+   ability to have multiple updates to the same doc in a single
+   _bulk_docs transaction
+
+8. I'm not currently decreasing size after updating an existing document.
+   It looks like we're gonna have to do some sort of cleverness with the
+   existing FDI sizes field maybe?
+
+9. I'm cheating really bad with term_to_binary and ignoring serialization
+   but given that's all going to change I'm not too concerned about it
+   at this point.
\ No newline at end of file
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index cceeac0..7cc1e42 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -370,16 +370,13 @@ delete_db_req(#httpd{}=Req, DbName) ->
     end.
 
 do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
-    Db = #{
-        name => DbName,
-        user_ctx => Ctx
-    },
+    Db = fabric2:open_db(DbName, [{user_ctx, Ctx}]),
     Fun(Req, Db).
 
-db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
+db_req(#httpd{method='GET',path_parts=[DbName]}=Req, Db) ->
     % measure the time required to generate the etag, see if it's worth it
     T0 = os:timestamp(),
-    {ok, DbInfo} = fabric2:get_db_info(DbName),
+    {ok, DbInfo} = fabric2:get_db_info(Db),
     DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
     couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
     send_json(Req, {DbInfo});
diff --git a/src/fabric/src/fabric2.erl b/src/fabric/src/fabric2.erl
index 924fb89..27aadd2 100644
--- a/src/fabric/src/fabric2.erl
+++ b/src/fabric/src/fabric2.erl
@@ -24,6 +24,9 @@
     delete_db/1,
     delete_db/2,
 
+    open_db/1,
+    open_db/2,
+
     get_db_info/1,
     get_doc_count/1,
     get_doc_count/2,
@@ -32,7 +35,7 @@
     %% set_revs_limit/3,
 
     get_security/1,
-    set_security/2
+    set_security/2,
 
     %% get_purge_infos_limit/1,
     %% set_purge_infos_limit/3,
@@ -51,9 +54,10 @@
     %%
     %% get_missing_revs/2,
     %% get_missing_revs/3,
-    %%
-    %% update_doc/3,
-    %% update_docs/3,
+
+    update_doc/3,
+    update_docs/3
+
     %% purge_docs/3,
     %%
     %% att_receiver/2,
@@ -100,7 +104,7 @@ create_db(DbName, _Options) ->
     fabric_server:transactional(fun(Tx) ->
         try
             DbDir = erlfdb_directory:create(Tx, DbsDir, DbName),
-            init_db(Tx, DbDir)
+            fabric2_db:init(Tx, DbDir)
         catch error:{erlfdb_directory, {create_error, path_exists, _}} ->
             {error, file_exists}
         end
@@ -122,60 +126,76 @@ delete_db(DbName, _Options) ->
     end.
 
 
-get_db_info(DbName) ->
-    [DbDir, MetaRows, LastChangeRow] = fabric_server:transactional(fun(Tx) ->
-        DbDir = open_db(Tx, DbName),
-        Meta = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>}),
-        MetaFuture = erlfdb:get_range_startswith(Tx, Meta),
-        {CStart, CEnd} = erlfdb_directory:range(DbDir, {<<"changes">>}),
-        ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
-                {streaming_mode, exact},
-                {limit, 1},
-                {reverse, true}
-            ]),
-        [DbDir] ++ erlfdb:wait_for_all([MetaFuture, ChangesFuture])
+open_db(DbName) ->
+    open_db(DbName, []).
+
+
+open_db(DbName, Options) ->
+    fabric2_db:open(DbName, Options).
+
+
+get_db_info(DbName) when is_binary(DbName) ->
+    get_db_info(open_db(DbName));
+
+get_db_info(Db) ->
+    DbProps = fabric2_db:with_tx(Db, fun(TxDb) ->
+        {CStart, CEnd} = fabric2_db:range_bounds(Db, {<<"changes">>}),
+        ChangesFuture = fabric2_db:get_range(TxDb, {CStart}, {CEnd}, [
+            {streaming_mode, exact},
+            {limit, 1},
+            {reverse, true}
+        ]),
+
+        StatsPrefix = {<<"stats">>, <<"meta">>},
+        MetaFuture = fabric2_db:get_range_startswith(TxDb, StatsPrefix),
+
+        RawSeq = case erlfdb:wait(ChangesFuture) of
+            [] ->
+                <<0:80>>;
+            [{SeqKey, _}] ->
+                {<<"changes">>, SeqBin} = fabric2_db:unpack(TxDb, SeqKey),
+                SeqBin
+        end,
+        CProp = {update_seq, ?l2b(couch_util:to_hex(RawSeq))},
+
+        MProps = lists:flatmap(fun(K, V) ->
+            case fabric2_db:unpack(K) of
+                {_, _, <<"doc_count">>} ->
+                    [{doc_count, ?bin2uint(V)}];
+                {_, _, <<"doc_del_count">>} ->
+                    [{doc_del_count, ?bin2uint(V)}];
+                {_, _, <<"size">>} ->
+                    Val = ?bin2uint(V),
+                    [
+                        {other, {[{data_size, Val}]}},
+                        {sizes, {[
+                            {active, 0},
+                            {external, Val},
+                            {file, 0}
+                        ]}}
+                    ];
+                _ ->
+                    []
+            end
+        end, erlfdb:wait(MetaFuture)),
+
+        [CProp | MProps]
     end),
+
     BaseProps = [
         {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
         {compact_running, false},
         {data_size, 0},
-        {db_name, DbName},
+        {db_name, fabric2_db:name(Db)},
         {disk_format_version, 0},
         {disk_size, 0},
         {instance_start_time, <<"0">>},
         {purge_seq, 0}
     ],
-    WithMeta = lists:foldl(fun({KBin, VBin}, PropAcc) ->
-        case erlfdb_directory:unpack(DbDir, KBin) of
-            {_, _, <<"doc_count">>} ->
-                Val = ?bin2uint(VBin),
-                lists:keystore(doc_count, 1, PropAcc, {doc_count, Val});
-            {_, _, <<"doc_del_count">>} ->
-                Val = ?bin2uint(VBin),
-                lists:keystore(doc_del_count, 1, PropAcc, {doc_del_count, Val});
-            {_, _, <<"size">>} ->
-                Val = ?bin2uint(VBin),
-                Other = {other, {[{data_size, Val}]}},
-                Sizes = {sizes, {[
-                        {active, 0},
-                        {external, Val},
-                        {file, 0}
-                    ]}},
-                PA1 = lists:keystore(other, 1, PropAcc, Other),
-                lists:keystore(sizes, 1, PA1, Sizes);
-            _ ->
-                PropAcc
-        end
-    end, BaseProps, MetaRows),
-    RawSeq = case LastChangeRow of
-        [] ->
-            <<0:80>>;
-        [{KBin, _}] ->
-            {<<"changes">>, SeqBin} = erlfdb_directory:unpack(DbDir, KBin),
-            SeqBin
-    end,
-    Seq = ?l2b(couch_util:to_hex(RawSeq)),
-    {ok, lists:keystore(update_seq, 1, WithMeta, {update_seq, Seq})}.
+
+    {ok, lists:foldl(fun({Key, Val}, Acc) ->
+        lists:keystore(Key, 1, Acc, {Key, Val})
+    end, BaseProps, DbProps)}.
 
 
 get_doc_count(DbName) ->
@@ -191,57 +211,96 @@ get_doc_count(DbName, <<"_design">>) ->
 get_doc_count(DbName, <<"_local">>) ->
     get_doc_count(DbName, <<"doc_local_count">>);
 
-get_doc_count(DbName, Key) ->
-    fabric_server:transactional(fun(Tx) ->
-        DbDir = open_db(Tx, DbName),
-        Key = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>, Key}),
-        VBin = erlfdb:wait(erlfdb:get(Tx, Key)),
-        ?bin2uint(VBin)
+get_doc_count(Db, Key) ->
+    fabric2_db:with_tx(Db, fun(TxDb) ->
+        Future = fabric2_db:get(TxDb, {<<"meta">>, <<"stats">>, Key}),
+        ?bin2uint(erlfdb:wait(Future))
     end).
 
 
-get_security(DbName) ->
-    SecJson = fabric_server:transactional(fun(Tx) ->
-        DbDir = open_db(Tx, DbName),
-        Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>},
-        Key = erlfdb_directory:pack(DbDir, Tuple),
-        erlfdb:wait(erlfdb:get(Tx, Key))
+get_security(DbName) when is_binary(DbName) ->
+    get_security(open_db(DbName));
+
+get_security(Db) ->
+    Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
+    SecJson = fabric2_db:with_tx(Db, fun(TxDb) ->
+        erlfdb:wait(fabric2_db:get(TxDb, Key))
     end),
     ?JSON_DECODE(SecJson).
 
 
-set_security(DbName, ErlJson) ->
+set_security(DbName, ErlJson) when is_binary(DbName) ->
+    set_security(open_db(DbName), ErlJson);
+
+set_security(Db, ErlJson) ->
+    Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
     SecJson = ?JSON_ENCODE(ErlJson),
-    fabric_server:transactional(fun(Tx) ->
-        DbDir = open_db(Tx, DbName),
-        Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>},
-        Key = erlfdb_directory:pack(DbDir, Tuple),
-        erlfdb:set(Tx, Key, SecJson)
+    fabric2_db:with_tx(Db, fun(TxDb) ->
+        fabric2_db:set(TxDb, Key, SecJson)
     end).
 
 
-init_db(Tx, DbDir) ->
-    Defaults = [
-        {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
-        {{<<"meta">>, <<"config">>, <<"security_doc">>}, ?DEFAULT_SECURITY},
-        {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
-        {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
-        {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
-        {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
-        {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
-    ],
-    lists:foreach(fun({K, V}) ->
-        BinKey = erlfdb_directory:pack(DbDir, K),
-        erlfdb:set(Tx, BinKey, V)
-    end, Defaults).
+update_doc(Db, Doc, Options) ->
+    fabric2_db:with_tx(Db, fun(TxDb) ->
+        case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+            {ok, []} ->
+                % replication no-op
+                #doc{revs = {Pos, [RevId | _]}} = doc(Db, Doc),
+                {ok, {Pos, RevId}};
+            {ok, NewRev} ->
+                {ok, NewRev};
+           {error, Error} ->
+               throw(Error)
+        end
+    end).
 
 
-open_db(Tx, DbName) ->
-    % We'll eventually want to cache this in the
-    % fabric_server ets table.
-    DbsDir = fabric_server:get_dir(dbs),
-    try
-        erlfdb_directory:open(Tx, DbsDir, DbName)
-    catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
-        erlang:error(database_does_not_exist)
+update_docs(DbName, Docs, Options) when is_binary(DbName) ->
+    update_docs(open_db(DbName, Options), Docs, Options);
+
+update_docs(Db, Docs, Options) ->
+    fabric2_db:with_tx(Db, fun(TxDb) ->
+        {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) ->
+            case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+                {ok, _} = Resp ->
+                    {Resp, Acc};
+                {error, _} = Resp ->
+                    {Resp, error}
+            end
+        end, ok, Docs),
+        {Status, Resps}
+    end).
+
+
+docs(Db, Docs) ->
+    lists:map(fun(Doc) -> doc(Db, Doc) end, Docs).
+
+
+doc(_Db, #doc{} = Doc) ->
+    Doc;
+
+doc(Db, {_} = Doc) ->
+    couch_db:doc_from_json_obj_validate(Db, Doc);
+
+doc(_Db, Doc) ->
+    erlang:error({illegal_doc_format, Doc}).
+
+
+opts(Options) ->
+    lists:foldl(fun(Opt, Acc) ->
+        add_option(Opt, Acc)
+    end, Options, [user_ctx, io_priority]).
+
+
+add_option(Key, Options) ->
+    case couch_util:get_value(Key, Options) of
+        undefined ->
+            case erlang:get(Key) of
+                undefined ->
+                    Options;
+                Value ->
+                    [{Key, Value} | Options]
+            end;
+        _ ->
+            Options
     end.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
new file mode 100644
index 0000000..1e5cb18
--- /dev/null
+++ b/src/fabric/src/fabric2_db.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_db).
+
+
+-export([
+    open/2,
+    init/3,
+
+    name/1,
+
+    with_tx/2,
+
+    pack/2,
+    unpack/2,
+    range_bounds/2,
+
+    get/2,
+    get_range/3,
+    get_range/4,
+    get_range_startswith/2,
+    get_range_startswith/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric.hrl").
+
+
+open(DbName, Options) when is_binary(DbName), is_list(Options) ->
+    BaseDb = #{
+        name => DbName,
+        tx => undefined,
+        dir => undefined,
+        user_ctx => #user_ctx{},
+        validate_doc_update => undefined
+    },
+    lists:foldl(fun({K, V}, DbAcc) ->
+        maps:put(K, V, DbAcc)
+    end, BaseDb, Options).
+
+
+name(#{name := Name}) ->
+    Name.
+
+
+init(DbName, Tx, DbDir) ->
+    TxDb = open(DbName, [{tx, Tx}, {dir, DbDir}]),
+    Defaults = [
+        {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
+        {{<<"meta">>, <<"config">>, <<"security_doc">>}, <<"{}">>},
+        {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
+        {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
+        {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
+        {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
+        {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
+    ],
+    lists:foreach(fun({K, V}) ->
+        erlfdb:set(Tx, pack(TxDb, K), V)
+    end, Defaults).
+
+
+with_tx(Db, Fun) when is_function(Fun, 1) ->
+    DbName = maps:get(name, Db),
+    DbsDir = fabric_server:get_dir(dbs),
+    fabric_server:transactional(fun(Tx) ->
+        % We'll eventually want to cache this in the
+        % fabric_server ets table.
+        DbDir = try
+            erlfdb_directory:open(Tx, DbsDir, DbName)
+        catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
+            erlang:error(database_does_not_exist)
+        end,
+        TxDb = Db#{tx := Tx, dir := DbDir},
+        Fun(TxDb)
+    end).
+
+
+pack(#{dir := undefined} = Db, _Tuple) ->
+    erlang:error({no_directory, Db});
+
+pack(Db, Tuple) ->
+    #{
+        dir := Dir
+    } = Db,
+    erlfdb_directory:pack(Dir, Tuple).
+
+
+unpack(#{dir := undefined} = Db, _Key) ->
+    erlang:error({no_directory, Db});
+
+unpack(Db, Key) ->
+    #{
+        dir := Dir
+    } = Db,
+    erlfdb_directory:unpack(Dir, Key).
+
+
+range_bounds(#{dir := undefined} = Db, _Key) ->
+    erlang:error({no_directory, Db});
+
+range_bounds(Db, Key) ->
+    #{
+        dir := Dir
+    } = Db,
+    erlfdb_directory:range(Dir, Key).
+
+
+
+get(#{tx := undefined} = Db, _Key) ->
+    erlang:error({invalid_tx_db, Db});
+
+get(Db, Key) ->
+    #{
+        tx := Tx,
+        dir := Dir
+    } = Db,
+    BinKey = erlfdb_direcotry:pack(Dir, Key),
+    erlfdb:get(Tx, BinKey).
+
+
+get_range(Db, StartKey, EndKey) ->
+    get_range(Db, StartKey, EndKey, []).
+
+
+get_range(#{tx := undefined} = Db, _, _, _) ->
+    erlang:error({invalid_tx_db, Db});
+
+get_range(Db, StartKey, EndKey, Options) ->
+    #{
+        tx := Tx,
+        dir := Dir
+    } = Db,
+    BinStartKey = erlfdb_directory:pack(Dir, StartKey),
+    BinEndKey= erlfdb_directory:pack(Dir, EndKey),
+    erlfdb:get_range(Tx, BinStartKey, BinEndKey, Options).
+
+
+get_range_startswith(Db, Prefix) ->
+    get_range_startswith(Db, Prefix, []).
+
+
+get_range_startswith(#{tx := undefined} = Db, _, _) ->
+    erlang:error({invalid_tx_db, Db});
+
+get_range_startswith(Db, Prefix, Options) ->
+    #{
+        tx := Tx,
+        dir := Dir
+    } = Db,
+    BinPrefix = erlfdb_directory:pack(Dir, Prefix),
+    erlfdb:get_range_startswith(Tx, BinPrefix, Options).
diff --git a/src/fabric/src/fabric2_doc.erl b/src/fabric/src/fabric2_doc.erl
new file mode 100644
index 0000000..b9ad2e1
--- /dev/null
+++ b/src/fabric/src/fabric2_doc.erl
@@ -0,0 +1,420 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_doc).
+
+
+-export([
+    get_fdi/2,
+    open/3,
+    update/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(RETURN(Term), throw({?MODULE, Term})).
+
+
+get_fdi(TxDb, DocId) ->
+    Future = fabric2_db:get(TxDb, {<<"revs">>, DocId}),
+    fdb_to_fdi(DocId, erlfdb:wait(Future)).
+
+
+open(TxDb, DocId, {Pos, [Rev | _] = Path}) ->
+    Key = {<<"docs">>, DocId, Pos, Rev},
+    Future = fabric2_db:get(TxDb, Key),
+    fdb_to_doc(DocId, Pos, Path, erlfdb:wait(Future)).
+
+
+update(TxDb, #doc{} = Doc0, Options) ->
+    UpdateType = case lists:member(replicated_changes, Options) of
+        true -> replicated_changes;
+        false -> interactive_edit
+    end,
+
+    try
+        FDI1 = get_fdi(TxDb, Doc0#doc.id),
+        Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType),
+        Doc2 = case UpdateType of
+            interactive_edit -> new_revid(Doc1);
+            replicated_changes -> Doc1
+        end,
+        {FDI2, Doc3} = merge_rev_tree(FDI1, Doc2, UpdateType),
+
+        #{tx := Tx} = TxDb,
+
+        % Delete old entry in changes feed
+        OldSeqKey = {<<"changes">>, FDI1#full_doc_info.update_seq},
+        OldSeqKeyBin = fabric2_db:pack(TxDb, OldSeqKey),
+        erlfdb:clear(Tx, OldSeqKeyBin),
+
+        % Add new entry to changes feed
+        NewSeqKey = {<<"changes">>, <<16#FFFFFFFFFFFFFFFFFFFF:80>>},
+        NewSeqKeyBin = fabric2_db:pack_vs(TxDb, NewSeqKey),
+        erlfdb:set_versionstamped_key(Tx, NewSeqKeyBin, Doc3#doc.id),
+
+        {RevStart, [Rev | _]} = Doc3#doc.revs,
+
+        % Write document data
+        {NewDocKey, NewDocVal} = doc_to_fdb(TxDb, Doc3),
+        erlfdb:set(Tx, NewDocKey, NewDocVal),
+
+        % Update revision tree entry
+        {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI2),
+        erlfdb:set_versionstampled_value(Tx, NewFDIKey, NewFDIVal),
+
+        % And done.
+        {ok, {RevStart, Rev}}
+    catch throw:{?MODULE, Return} ->
+        Return
+    end.
+
+
+prep_and_validate(TxDb, not_found, Doc, UpdateType) ->
+    case Doc#doc.revs of
+        {0, []} when UpdateType == interactive_edit ->
+            ?RETURN({error, conflict});
+        _ ->
+            ok
+    end,
+    prep_and_validate(TxDb, Doc, fun() -> nil end);
+
+prep_and_validate(TxDb, FDI, Doc, interactive_edit) ->
+    #doc{
+        revs = {Start, Revs}
+    } = Doc,
+
+    Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree),
+    LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) ->
+        {{LeafStart, LeafRev}, Path}
+    end, Leafs),
+
+    GetDocFun = case Revs of
+        [PrevRev | _] ->
+            case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of
+                {{Start, PrevRev}, Path} ->
+                    fun() -> open(TxDb, Doc#doc.id, {Start, Path}) end;
+                false ->
+                    ?RETURN({error, conflict})
+            end;
+        [] ->
+            case FDI#full_doc_info.deleted of
+                true ->
+                    fun() -> nil end;
+                false ->
+                    ?RETURN({error, conflict})
+            end
+    end,
+    prep_and_validate(TxDb, Doc, GetDocFun);
+
+prep_and_validate(TxDb, FDI, Doc, replicated_changes) ->
+    #full_doc_info{
+        rev_tree = RevTree
+    } = FDI,
+    OldLeafs = couch_key_tree:get_all_leafs_full(RevTree),
+    OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs],
+
+    NewPath = couch_doc:to_path(Doc),
+    NewRevTree = couch_key_tree:merge(RevTree, NewPath),
+
+    Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+    LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) ->
+        [{{Start, RevId}, FullPath}]
+    end, Leafs),
+    LeafRevsFullDict = dict:from_list(LeafRevsFull),
+
+    #doc{
+        revs = {DocStart, [DocRev | _]}
+    } = Doc,
+    DocRevId = {DocStart, DocRev},
+
+    IsOldLeaf = lists:member(DocRevId, OldLeafsLU),
+    GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of
+        {ok, {DocStart, RevPath}} when not IsOldLeaf ->
+            % An incoming replicated edit only sends us leaf
+            % nodes which may have included multiple updates
+            % we haven't seen locally. Thus we have to search
+            % back through the tree to find the first edit
+            % we do know about.
+            case find_prev_known_rev(DocStart, RevPath) of
+                not_found -> fun() -> nil end;
+                PrevRevs -> fun() -> open(TxDb, Doc#doc.id, PrevRevs) end
+            end;
+        _ ->
+            % The update merged to an internal node that we
+            % already know about which means we're done with
+            % this update.
+            ?RETURN({ok, []})
+    end,
+
+    prep_and_validate(TxDb, Doc, GetDocFun).
+
+
+prep_and_validate(TxDb, Doc, GetDocBody) ->
+    NewDoc = case couch_doc:has_stubs(Doc) of
+        true ->
+            case GetDocBody() of
+                #doc{} = PrevDoc ->
+                    couch_doc:merge_stubs(Doc, PrevDoc);
+                _ ->
+                    % Force a missing stubs error
+                    couch_doc:mege_stubs(Doc, #doc{})
+            end;
+        false ->
+            Doc
+    end,
+    validate_doc_update(TxDb, NewDoc, GetDocBody),
+    NewDoc.
+
+
+merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted ->
+    % We're recreating a document that was previously
+    % deleted. To check that this is a recreation from
+    % the root we assert that the new document has a
+    % revision depth of 1 (this is to avoid recreating a
+    % doc from a previous internal revision) and is also
+    % not deleted. To avoid expanding the revision tree
+    % unnecessarily we create a new revision based on
+    % the winning deleted revision.
+
+    {RevDepth, _} = Doc#doc.revs,
+    case RevDepth == 1 andalso not Doc#doc.deleted of
+        true ->
+            % Update the new doc based on revisions in OldInfo
+            #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI),
+            #rev_info{rev={OldPos, OldRev}} = WinningRev,
+            Body = case couch_util:get_value(comp_body, Doc#doc.meta) of
+                CompBody when is_binary(CompBody) ->
+                    couch_compress:decompress(CompBody);
+                _ ->
+                    Doc#doc.body
+            end,
+            NewDoc = new_revid(Doc#doc{
+                revs = {OldPos, [OldRev]},
+                body = Body
+            }),
+
+            % Merge our modified new doc into the tree
+            #full_doc_info{rev_tree = RevTree} = FDI,
+            case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of
+                {NewRevTree, new_leaf} ->
+                    % We changed the revision id so inform the caller
+                    NewFDI = FDI#full_doc_info{
+                        rev_tree = NewRevTree,
+                        deleted = false
+                    },
+                    {NewFDI, NewDoc};
+                _ ->
+                    throw(doc_recreation_failed)
+            end;
+        _ ->
+            ?RETURN({error, conflict})
+    end;
+merge_rev_tree(FDI, Doc, interactive_edit) ->
+    % We're attempting to merge a new revision into an
+    % undeleted document. To not be a conflict we require
+    % that the merge results in extending a branch.
+
+    RevTree = FDI#full_doc_info.rev_tree,
+    case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of
+        {NewRevTree, new_leaf} when not Doc#doc.deleted ->
+            NewFDI = FDI#full_doc_info{
+                rev_tree = NewRevTree,
+                deleted = false
+            },
+            {NewFDI, Doc};
+        {NewRevTree, new_leaf} when Doc#doc.deleted ->
+            % We have to check if we just deleted this
+            % document completely or if it was a conflict
+            % resolution.
+            NewFDI = FDI#full_doc_info{
+                rev_tree = NewRevTree,
+                deleted = couch_doc:is_deleted(NewRevTree)
+            },
+            {NewFDI, Doc};
+        _ ->
+            ?RETURN({error, conflict})
+    end;
+merge_rev_tree(FDI, Doc, replicated_changes) ->
+    % We're merging in revisions without caring about
+    % conflicts. Most likely this is a replication update.
+    RevTree = FDI#full_doc_info.rev_tree,
+    {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)),
+    NewFDI = FDI#full_doc_info{
+        rev_tree = NewRevTree,
+        deleted = couch_doc:is_deleted(NewRevTree)
+    },
+    % If a replicated change did not modify the revision
+    % tree then we've got nothing else to do.
+    if NewFDI /= FDI -> ok; true ->
+        ?RETURN({ok, []})
+    end,
+    {NewFDI, Doc}.
+
+
+
+validate_doc_update(TxDb, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
+    case catch fabric2_db:check_is_admin(TxDb) of
+        ok -> validate_ddoc(TxDb, Doc);
+        Error -> ?RETURN(Error)
+    end;
+validate_doc_update(_TxDb, #doc{id = <<"_local/", _/binary>>}, _) ->
+    ok;
+validate_doc_update(TxDb, Doc, GetDiskDocFun) ->
+    Fun = fun() ->
+        DiskDoc = GetDiskDocFun(),
+        JsonCtx = couch_util:json_user_ctx(TxDb),
+        SecObj = fabric2_db:get_security(TxDb),
+        try
+            lists:map(fun(VDU) ->
+                case VDU(Doc, DiskDoc, JsonCtx, SecObj) of
+                    ok -> ok;
+                    Error -> ?RETURN(Error)
+                end
+            end, fabric2_db:get_vdus(TxDb)),
+            ok
+        catch
+            throw:Error ->
+                Error
+        end
+    end,
+    Stat = [couchdb, query_server, vdu_process_time],
+    couch_stats:update_histogram(Stat, Fun).
+
+
+validate_ddoc(TxDb, DDoc) ->
+    try
+        ok = couch_index_server:validate(TxDb, couch_doc:with_ejson_body(DDoc))
+    catch
+        throw:{invalid_design_doc, Reason} ->
+            throw({bad_request, invalid_design_doc, Reason});
+        throw:{compilation_error, Reason} ->
+            throw({bad_request, compilation_error, Reason});
+        throw:Error ->
+            ?RETURN(Error)
+    end.
+
+
+find_prev_known_rev(_Pos, []) ->
+    not_found;
+find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) ->
+    % doc records are skipped because these are the result
+    % of replication sending us an update. We're only interested
+    % in what we have locally since we're comparing attachment
+    % stubs. The replicator should never do this because it
+    % should only ever send leaves but the possibility exists
+    % so we account for it.
+    find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
+    find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) ->
+    {Pos, [Rev || {Rev, _Val} <- DocPath]}.
+
+
+new_revid(Doc) ->
+    #doc{
+        body = Body,
+        revs = {OldStart, OldRevs},
+        atts = Atts,
+        deleted = Deleted
+    } = Doc,
+
+    DigestedAtts = lists:foldl(fun(Att, Acc) ->
+        [N, T, M] = couch_att:fetch([name, type, md5], Att),
+        case M == <<>> of
+            true -> Acc;
+            false -> [{N, T, M} | Acc]
+        end
+    end, [], Atts),
+
+    Rev = case DigestedAtts of
+        Atts2 when length(Atts) =/= length(Atts2) ->
+            % We must have old style non-md5 attachments
+            list_to_binary(integer_to_list(couch_util:rand32()));
+        Atts2 ->
+            OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
+            SigTerm = [Deleted, OldStart, OldRev, Body, Atts2],
+            couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]))
+    end,
+
+    Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}.
+
+
+doc_to_fdb(TxDb, #doc{} = Doc) ->
+    #doc{
+        id = Id,
+        revs = {Start, [Rev | _]},
+        body = Body,
+        atts = Atts,
+        deleted = Deleted
+    } = Doc,
+    Key = {<<"docs">>, Id, Start, Rev},
+    KeyBin = fabric2_db:pack(TxDb, Key),
+    Val = {Body, Atts, Deleted},
+    {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_doc(DocId, Pos, Path, Bin) when is_binary(Bin) ->
+    {Body, Atts, Deleted} = binary_to_term(Bin, [safe]),
+    #doc{
+        id = DocId,
+        revs = {Pos, Path},
+        body = Body,
+        atts = Atts,
+        deleted = Deleted
+    };
+fdb_to_doc(_DocId, _Pos, _Path, not_found) ->
+    {not_found, missing}.
+
+
+fdi_to_fdb(TxDb, #full_doc_info{} = FDI) ->
+    #full_doc_info{
+        id = Id,
+        deleted = Deleted,
+        rev_tree = RevTree
+    } = flush_tree(FDI),
+
+    Key = {<<"revs">>, Id},
+    KeyBin = fabric2_db:pack(TxDb, Key),
+    Val = {Deleted, RevTree, <<16#FFFFFFFFFFFFFFFFFFFF:80>>},
+    {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_fdi(Id, Bin) when is_binary(Bin) ->
+    {Deleted, RevTree, UpdateSeq} = binary_to_term(Bin, [safe]),
+    #full_doc_info{
+        id = Id,
+        deleted = Deleted,
+        rev_tree = RevTree,
+        update_seq = UpdateSeq
+    };
+fdb_to_fdi(_Id, not_found) ->
+    not_found.
+
+
+flush_tree(FDI) ->
+    #full_doc_info{
+        rev_tree = Unflushed
+    } = FDI,
+
+    Flushed = couch_key_tree:map(fun(_Rev, Value) ->
+        case Value of
+            #doc{deleted = Del} -> #leaf{deleted = Del};
+            _ -> Value
+        end
+    end, Unflushed),
+
+    FDI#full_doc_info{
+        rev_tree = Flushed
+    }.