You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/03/08 22:35:53 UTC
[couchdb] branch prototype/fdb-layer updated: Implement update_docs
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
new 73ce010 Implement update_docs
73ce010 is described below
commit 73ce010c0fc20d9fa84eec7cf39a3c81e7ba73e0
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Mar 8 16:34:43 2019 -0600
Implement update_docs
This is a first pass attempt at an implementation of update_docs.
There's probably a lot long here and a number of things are expected to
blow up if features like VDUs are used.
---
FDB_NOTES.md | 45 +++++
src/chttpd/src/chttpd_db.erl | 9 +-
src/fabric/src/fabric2.erl | 237 ++++++++++++++---------
src/fabric/src/fabric2_db.erl | 162 ++++++++++++++++
src/fabric/src/fabric2_doc.erl | 420 +++++++++++++++++++++++++++++++++++++++++
5 files changed, 778 insertions(+), 95 deletions(-)
diff --git a/FDB_NOTES.md b/FDB_NOTES.md
new file mode 100644
index 0000000..9278a2f
--- /dev/null
+++ b/FDB_NOTES.md
@@ -0,0 +1,45 @@
+Things of Note
+===
+
+
+1. If a replication sends us two revisions A and B where one is an
+ ancestor of the other, we likely have divergent behavior. However,
+ this should never happen In Theory.
+
+2. Multiple updates to the same document in a _bulk_docs (or if they
+ just happen to be in the same update batch in non-fdb CouchDB)
+ we likely have subtly different behavior.
+
+3. I'm relying on repeated reads in an fdb transaction to be "cheap"
+ in that the reads would be cached in the fdb_transaction object.
+ This needs to be checked for certainty but that appeared to
+ be how things behaved in testing.
+
+4. When attempting to create a doc from scratch in an interacitve_edit
+ update, with revisions specified *and* attachment stubs, the reported
+ error is now a conflict. Previously the missing_stubs error was
+ raised earlier.
+
+5. There may be a difference in behavior if a) there are no VDU functions
+ set on a db and no design documents in a batch. This is because in
+ this situation we don't run the prep_and_validate code on pre-fdb
+ CouchDB. The new code always checks stubs before merging revision trees.
+ I'm sure the old way would fail somehow, but it would fail further on
+ which means we may have failed with a different reason (conflict, etc)
+ before we got to the next place we check for missing stubs.
+
+6. I still need add paging for the DirectoryLayer results so that we
+ can use that for the _all_dbs end point.
+
+7. For multi-doc updates we'll need to investigate user versions on
+ versionstamps within a transaction. Also this likely prevents the
+ ability to have multiple updates to the same doc in a single
+ _bulk_docs transaction
+
+8. I'm not currently decreasing size after updating an existing document.
+ It looks like we're gonna have to do some sort of cleverness with the
+ existing FDI sizes field maybe?
+
+9. I'm cheating really bad with term_to_binary and ignoring serialization
+ but given that's all going to change I'm not too concerned about it
+ at this point.
\ No newline at end of file
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index cceeac0..7cc1e42 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -370,16 +370,13 @@ delete_db_req(#httpd{}=Req, DbName) ->
end.
do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
- Db = #{
- name => DbName,
- user_ctx => Ctx
- },
+ Db = fabric2:open_db(DbName, [{user_ctx, Ctx}]),
Fun(Req, Db).
-db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
+db_req(#httpd{method='GET',path_parts=[DbName]}=Req, Db) ->
% measure the time required to generate the etag, see if it's worth it
T0 = os:timestamp(),
- {ok, DbInfo} = fabric2:get_db_info(DbName),
+ {ok, DbInfo} = fabric2:get_db_info(Db),
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
send_json(Req, {DbInfo});
diff --git a/src/fabric/src/fabric2.erl b/src/fabric/src/fabric2.erl
index 924fb89..27aadd2 100644
--- a/src/fabric/src/fabric2.erl
+++ b/src/fabric/src/fabric2.erl
@@ -24,6 +24,9 @@
delete_db/1,
delete_db/2,
+ open_db/1,
+ open_db/2,
+
get_db_info/1,
get_doc_count/1,
get_doc_count/2,
@@ -32,7 +35,7 @@
%% set_revs_limit/3,
get_security/1,
- set_security/2
+ set_security/2,
%% get_purge_infos_limit/1,
%% set_purge_infos_limit/3,
@@ -51,9 +54,10 @@
%%
%% get_missing_revs/2,
%% get_missing_revs/3,
- %%
- %% update_doc/3,
- %% update_docs/3,
+
+ update_doc/3,
+ update_docs/3
+
%% purge_docs/3,
%%
%% att_receiver/2,
@@ -100,7 +104,7 @@ create_db(DbName, _Options) ->
fabric_server:transactional(fun(Tx) ->
try
DbDir = erlfdb_directory:create(Tx, DbsDir, DbName),
- init_db(Tx, DbDir)
+ fabric2_db:init(Tx, DbDir)
catch error:{erlfdb_directory, {create_error, path_exists, _}} ->
{error, file_exists}
end
@@ -122,60 +126,76 @@ delete_db(DbName, _Options) ->
end.
-get_db_info(DbName) ->
- [DbDir, MetaRows, LastChangeRow] = fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Meta = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>}),
- MetaFuture = erlfdb:get_range_startswith(Tx, Meta),
- {CStart, CEnd} = erlfdb_directory:range(DbDir, {<<"changes">>}),
- ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
- {streaming_mode, exact},
- {limit, 1},
- {reverse, true}
- ]),
- [DbDir] ++ erlfdb:wait_for_all([MetaFuture, ChangesFuture])
+open_db(DbName) ->
+ open_db(DbName, []).
+
+
+open_db(DbName, Options) ->
+ fabric2_db:open(DbName, Options).
+
+
+get_db_info(DbName) when is_binary(DbName) ->
+ get_db_info(open_db(DbName));
+
+get_db_info(Db) ->
+ DbProps = fabric2_db:with_tx(Db, fun(TxDb) ->
+ {CStart, CEnd} = fabric2_db:range_bounds(Db, {<<"changes">>}),
+ ChangesFuture = fabric2_db:get_range(TxDb, {CStart}, {CEnd}, [
+ {streaming_mode, exact},
+ {limit, 1},
+ {reverse, true}
+ ]),
+
+ StatsPrefix = {<<"stats">>, <<"meta">>},
+ MetaFuture = fabric2_db:get_range_startswith(TxDb, StatsPrefix),
+
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ <<0:80>>;
+ [{SeqKey, _}] ->
+ {<<"changes">>, SeqBin} = fabric2_db:unpack(TxDb, SeqKey),
+ SeqBin
+ end,
+ CProp = {update_seq, ?l2b(couch_util:to_hex(RawSeq))},
+
+ MProps = lists:flatmap(fun(K, V) ->
+ case fabric2_db:unpack(K) of
+ {_, _, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)}];
+ {_, _, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)}];
+ {_, _, <<"size">>} ->
+ Val = ?bin2uint(V),
+ [
+ {other, {[{data_size, Val}]}},
+ {sizes, {[
+ {active, 0},
+ {external, Val},
+ {file, 0}
+ ]}}
+ ];
+ _ ->
+ []
+ end
+ end, erlfdb:wait(MetaFuture)),
+
+ [CProp | MProps]
end),
+
BaseProps = [
{cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
{compact_running, false},
{data_size, 0},
- {db_name, DbName},
+ {db_name, fabric2_db:name(Db)},
{disk_format_version, 0},
{disk_size, 0},
{instance_start_time, <<"0">>},
{purge_seq, 0}
],
- WithMeta = lists:foldl(fun({KBin, VBin}, PropAcc) ->
- case erlfdb_directory:unpack(DbDir, KBin) of
- {_, _, <<"doc_count">>} ->
- Val = ?bin2uint(VBin),
- lists:keystore(doc_count, 1, PropAcc, {doc_count, Val});
- {_, _, <<"doc_del_count">>} ->
- Val = ?bin2uint(VBin),
- lists:keystore(doc_del_count, 1, PropAcc, {doc_del_count, Val});
- {_, _, <<"size">>} ->
- Val = ?bin2uint(VBin),
- Other = {other, {[{data_size, Val}]}},
- Sizes = {sizes, {[
- {active, 0},
- {external, Val},
- {file, 0}
- ]}},
- PA1 = lists:keystore(other, 1, PropAcc, Other),
- lists:keystore(sizes, 1, PA1, Sizes);
- _ ->
- PropAcc
- end
- end, BaseProps, MetaRows),
- RawSeq = case LastChangeRow of
- [] ->
- <<0:80>>;
- [{KBin, _}] ->
- {<<"changes">>, SeqBin} = erlfdb_directory:unpack(DbDir, KBin),
- SeqBin
- end,
- Seq = ?l2b(couch_util:to_hex(RawSeq)),
- {ok, lists:keystore(update_seq, 1, WithMeta, {update_seq, Seq})}.
+
+ {ok, lists:foldl(fun({Key, Val}, Acc) ->
+ lists:keystore(Key, 1, Acc, {Key, Val})
+ end, BaseProps, DbProps)}.
get_doc_count(DbName) ->
@@ -191,57 +211,96 @@ get_doc_count(DbName, <<"_design">>) ->
get_doc_count(DbName, <<"_local">>) ->
get_doc_count(DbName, <<"doc_local_count">>);
-get_doc_count(DbName, Key) ->
- fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Key = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>, Key}),
- VBin = erlfdb:wait(erlfdb:get(Tx, Key)),
- ?bin2uint(VBin)
+get_doc_count(Db, Key) ->
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ Future = fabric2_db:get(TxDb, {<<"meta">>, <<"stats">>, Key}),
+ ?bin2uint(erlfdb:wait(Future))
end).
-get_security(DbName) ->
- SecJson = fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>},
- Key = erlfdb_directory:pack(DbDir, Tuple),
- erlfdb:wait(erlfdb:get(Tx, Key))
+get_security(DbName) when is_binary(DbName) ->
+ get_security(open_db(DbName));
+
+get_security(Db) ->
+ Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
+ SecJson = fabric2_db:with_tx(Db, fun(TxDb) ->
+ erlfdb:wait(fabric2_db:get(TxDb, Key))
end),
?JSON_DECODE(SecJson).
-set_security(DbName, ErlJson) ->
+set_security(DbName, ErlJson) when is_binary(DbName) ->
+ set_security(open_db(DbName), ErlJson);
+
+set_security(Db, ErlJson) ->
+ Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
SecJson = ?JSON_ENCODE(ErlJson),
- fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>},
- Key = erlfdb_directory:pack(DbDir, Tuple),
- erlfdb:set(Tx, Key, SecJson)
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ fabric2_db:set(TxDb, Key, SecJson)
end).
-init_db(Tx, DbDir) ->
- Defaults = [
- {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
- {{<<"meta">>, <<"config">>, <<"security_doc">>}, ?DEFAULT_SECURITY},
- {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
- ],
- lists:foreach(fun({K, V}) ->
- BinKey = erlfdb_directory:pack(DbDir, K),
- erlfdb:set(Tx, BinKey, V)
- end, Defaults).
+update_doc(Db, Doc, Options) ->
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+ {ok, []} ->
+ % replication no-op
+ #doc{revs = {Pos, [RevId | _]}} = doc(Db, Doc),
+ {ok, {Pos, RevId}};
+ {ok, NewRev} ->
+ {ok, NewRev};
+ {error, Error} ->
+ throw(Error)
+ end
+ end).
-open_db(Tx, DbName) ->
- % We'll eventually want to cache this in the
- % fabric_server ets table.
- DbsDir = fabric_server:get_dir(dbs),
- try
- erlfdb_directory:open(Tx, DbsDir, DbName)
- catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
- erlang:error(database_does_not_exist)
+update_docs(DbName, Docs, Options) when is_binary(DbName) ->
+ update_docs(open_db(DbName, Options), Docs, Options);
+
+update_docs(Db, Docs, Options) ->
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) ->
+ case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+ {ok, _} = Resp ->
+ {Resp, Acc};
+ {error, _} = Resp ->
+ {Resp, error}
+ end
+ end, ok, Docs),
+ {Status, Resps}
+ end).
+
+
+docs(Db, Docs) ->
+ lists:map(fun(Doc) -> doc(Db, Doc) end, Docs).
+
+
+doc(_Db, #doc{} = Doc) ->
+ Doc;
+
+doc(Db, {_} = Doc) ->
+ couch_db:doc_from_json_obj_validate(Db, Doc);
+
+doc(_Db, Doc) ->
+ erlang:error({illegal_doc_format, Doc}).
+
+
+opts(Options) ->
+ lists:foldl(fun(Opt, Acc) ->
+ add_option(Opt, Acc)
+ end, Options, [user_ctx, io_priority]).
+
+
+add_option(Key, Options) ->
+ case couch_util:get_value(Key, Options) of
+ undefined ->
+ case erlang:get(Key) of
+ undefined ->
+ Options;
+ Value ->
+ [{Key, Value} | Options]
+ end;
+ _ ->
+ Options
end.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
new file mode 100644
index 0000000..1e5cb18
--- /dev/null
+++ b/src/fabric/src/fabric2_db.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_db).
+
+
+-export([
+ open/2,
+ init/3,
+
+ name/1,
+
+ with_tx/2,
+
+ pack/2,
+ unpack/2,
+ range_bounds/2,
+
+ get/2,
+ get_range/3,
+ get_range/4,
+ get_range_startswith/2,
+ get_range_startswith/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric.hrl").
+
+
+open(DbName, Options) when is_binary(DbName), is_list(Options) ->
+ BaseDb = #{
+ name => DbName,
+ tx => undefined,
+ dir => undefined,
+ user_ctx => #user_ctx{},
+ validate_doc_update => undefined
+ },
+ lists:foldl(fun({K, V}, DbAcc) ->
+ maps:put(K, V, DbAcc)
+ end, BaseDb, Options).
+
+
+name(#{name := Name}) ->
+ Name.
+
+
+init(DbName, Tx, DbDir) ->
+ TxDb = open(DbName, [{tx, Tx}, {dir, DbDir}]),
+ Defaults = [
+ {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
+ {{<<"meta">>, <<"config">>, <<"security_doc">>}, <<"{}">>},
+ {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
+ ],
+ lists:foreach(fun({K, V}) ->
+ erlfdb:set(Tx, pack(TxDb, K), V)
+ end, Defaults).
+
+
+with_tx(Db, Fun) when is_function(Fun, 1) ->
+ DbName = maps:get(name, Db),
+ DbsDir = fabric_server:get_dir(dbs),
+ fabric_server:transactional(fun(Tx) ->
+ % We'll eventually want to cache this in the
+ % fabric_server ets table.
+ DbDir = try
+ erlfdb_directory:open(Tx, DbsDir, DbName)
+ catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
+ erlang:error(database_does_not_exist)
+ end,
+ TxDb = Db#{tx := Tx, dir := DbDir},
+ Fun(TxDb)
+ end).
+
+
+pack(#{dir := undefined} = Db, _Tuple) ->
+ erlang:error({no_directory, Db});
+
+pack(Db, Tuple) ->
+ #{
+ dir := Dir
+ } = Db,
+ erlfdb_directory:pack(Dir, Tuple).
+
+
+unpack(#{dir := undefined} = Db, _Key) ->
+ erlang:error({no_directory, Db});
+
+unpack(Db, Key) ->
+ #{
+ dir := Dir
+ } = Db,
+ erlfdb_directory:unpack(Dir, Key).
+
+
+range_bounds(#{dir := undefined} = Db, _Key) ->
+ erlang:error({no_directory, Db});
+
+range_bounds(Db, Key) ->
+ #{
+ dir := Dir
+ } = Db,
+ erlfdb_directory:range(Dir, Key).
+
+
+
+get(#{tx := undefined} = Db, _Key) ->
+ erlang:error({invalid_tx_db, Db});
+
+get(Db, Key) ->
+ #{
+ tx := Tx,
+ dir := Dir
+ } = Db,
+ BinKey = erlfdb_direcotry:pack(Dir, Key),
+ erlfdb:get(Tx, BinKey).
+
+
+get_range(Db, StartKey, EndKey) ->
+ get_range(Db, StartKey, EndKey, []).
+
+
+get_range(#{tx := undefined} = Db, _, _, _) ->
+ erlang:error({invalid_tx_db, Db});
+
+get_range(Db, StartKey, EndKey, Options) ->
+ #{
+ tx := Tx,
+ dir := Dir
+ } = Db,
+ BinStartKey = erlfdb_directory:pack(Dir, StartKey),
+ BinEndKey= erlfdb_directory:pack(Dir, EndKey),
+ erlfdb:get_range(Tx, BinStartKey, BinEndKey, Options).
+
+
+get_range_startswith(Db, Prefix) ->
+ get_range_startswith(Db, Prefix, []).
+
+
+get_range_startswith(#{tx := undefined} = Db, _, _) ->
+ erlang:error({invalid_tx_db, Db});
+
+get_range_startswith(Db, Prefix, Options) ->
+ #{
+ tx := Tx,
+ dir := Dir
+ } = Db,
+ BinPrefix = erlfdb_directory:pack(Dir, Prefix),
+ erlfdb:get_range_startswith(Tx, BinPrefix, Options).
diff --git a/src/fabric/src/fabric2_doc.erl b/src/fabric/src/fabric2_doc.erl
new file mode 100644
index 0000000..b9ad2e1
--- /dev/null
+++ b/src/fabric/src/fabric2_doc.erl
@@ -0,0 +1,420 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_doc).
+
+
+-export([
+ get_fdi/2,
+ open/3,
+ update/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(RETURN(Term), throw({?MODULE, Term})).
+
+
+get_fdi(TxDb, DocId) ->
+ Future = fabric2_db:get(TxDb, {<<"revs">>, DocId}),
+ fdb_to_fdi(DocId, erlfdb:wait(Future)).
+
+
+open(TxDb, DocId, {Pos, [Rev | _] = Path}) ->
+ Key = {<<"docs">>, DocId, Pos, Rev},
+ Future = fabric2_db:get(TxDb, Key),
+ fdb_to_doc(DocId, Pos, Path, erlfdb:wait(Future)).
+
+
+update(TxDb, #doc{} = Doc0, Options) ->
+ UpdateType = case lists:member(replicated_changes, Options) of
+ true -> replicated_changes;
+ false -> interactive_edit
+ end,
+
+ try
+ FDI1 = get_fdi(TxDb, Doc0#doc.id),
+ Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType),
+ Doc2 = case UpdateType of
+ interactive_edit -> new_revid(Doc1);
+ replicated_changes -> Doc1
+ end,
+ {FDI2, Doc3} = merge_rev_tree(FDI1, Doc2, UpdateType),
+
+ #{tx := Tx} = TxDb,
+
+ % Delete old entry in changes feed
+ OldSeqKey = {<<"changes">>, FDI1#full_doc_info.update_seq},
+ OldSeqKeyBin = fabric2_db:pack(TxDb, OldSeqKey),
+ erlfdb:clear(Tx, OldSeqKeyBin),
+
+ % Add new entry to changes feed
+ NewSeqKey = {<<"changes">>, <<16#FFFFFFFFFFFFFFFFFFFF:80>>},
+ NewSeqKeyBin = fabric2_db:pack_vs(TxDb, NewSeqKey),
+ erlfdb:set_versionstamped_key(Tx, NewSeqKeyBin, Doc3#doc.id),
+
+ {RevStart, [Rev | _]} = Doc3#doc.revs,
+
+ % Write document data
+ {NewDocKey, NewDocVal} = doc_to_fdb(TxDb, Doc3),
+ erlfdb:set(Tx, NewDocKey, NewDocVal),
+
+ % Update revision tree entry
+ {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI2),
+ erlfdb:set_versionstampled_value(Tx, NewFDIKey, NewFDIVal),
+
+ % And done.
+ {ok, {RevStart, Rev}}
+ catch throw:{?MODULE, Return} ->
+ Return
+ end.
+
+
+prep_and_validate(TxDb, not_found, Doc, UpdateType) ->
+ case Doc#doc.revs of
+ {0, []} when UpdateType == interactive_edit ->
+ ?RETURN({error, conflict});
+ _ ->
+ ok
+ end,
+ prep_and_validate(TxDb, Doc, fun() -> nil end);
+
+prep_and_validate(TxDb, FDI, Doc, interactive_edit) ->
+ #doc{
+ revs = {Start, Revs}
+ } = Doc,
+
+ Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree),
+ LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) ->
+ {{LeafStart, LeafRev}, Path}
+ end, Leafs),
+
+ GetDocFun = case Revs of
+ [PrevRev | _] ->
+ case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of
+ {{Start, PrevRev}, Path} ->
+ fun() -> open(TxDb, Doc#doc.id, {Start, Path}) end;
+ false ->
+ ?RETURN({error, conflict})
+ end;
+ [] ->
+ case FDI#full_doc_info.deleted of
+ true ->
+ fun() -> nil end;
+ false ->
+ ?RETURN({error, conflict})
+ end
+ end,
+ prep_and_validate(TxDb, Doc, GetDocFun);
+
+prep_and_validate(TxDb, FDI, Doc, replicated_changes) ->
+ #full_doc_info{
+ rev_tree = RevTree
+ } = FDI,
+ OldLeafs = couch_key_tree:get_all_leafs_full(RevTree),
+ OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs],
+
+ NewPath = couch_doc:to_path(Doc),
+ NewRevTree = couch_key_tree:merge(RevTree, NewPath),
+
+ Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+ LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) ->
+ [{{Start, RevId}, FullPath}]
+ end, Leafs),
+ LeafRevsFullDict = dict:from_list(LeafRevsFull),
+
+ #doc{
+ revs = {DocStart, [DocRev | _]}
+ } = Doc,
+ DocRevId = {DocStart, DocRev},
+
+ IsOldLeaf = lists:member(DocRevId, OldLeafsLU),
+ GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of
+ {ok, {DocStart, RevPath}} when not IsOldLeaf ->
+ % An incoming replicated edit only sends us leaf
+ % nodes which may have included multiple updates
+ % we haven't seen locally. Thus we have to search
+ % back through the tree to find the first edit
+ % we do know about.
+ case find_prev_known_rev(DocStart, RevPath) of
+ not_found -> fun() -> nil end;
+ PrevRevs -> fun() -> open(TxDb, Doc#doc.id, PrevRevs) end
+ end;
+ _ ->
+ % The update merged to an internal node that we
+ % already know about which means we're done with
+ % this update.
+ ?RETURN({ok, []})
+ end,
+
+ prep_and_validate(TxDb, Doc, GetDocFun).
+
+
+prep_and_validate(TxDb, Doc, GetDocBody) ->
+ NewDoc = case couch_doc:has_stubs(Doc) of
+ true ->
+ case GetDocBody() of
+ #doc{} = PrevDoc ->
+ couch_doc:merge_stubs(Doc, PrevDoc);
+ _ ->
+ % Force a missing stubs error
+ couch_doc:mege_stubs(Doc, #doc{})
+ end;
+ false ->
+ Doc
+ end,
+ validate_doc_update(TxDb, NewDoc, GetDocBody),
+ NewDoc.
+
+
+merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted ->
+ % We're recreating a document that was previously
+ % deleted. To check that this is a recreation from
+ % the root we assert that the new document has a
+ % revision depth of 1 (this is to avoid recreating a
+ % doc from a previous internal revision) and is also
+ % not deleted. To avoid expanding the revision tree
+ % unnecessarily we create a new revision based on
+ % the winning deleted revision.
+
+ {RevDepth, _} = Doc#doc.revs,
+ case RevDepth == 1 andalso not Doc#doc.deleted of
+ true ->
+ % Update the new doc based on revisions in OldInfo
+ #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI),
+ #rev_info{rev={OldPos, OldRev}} = WinningRev,
+ Body = case couch_util:get_value(comp_body, Doc#doc.meta) of
+ CompBody when is_binary(CompBody) ->
+ couch_compress:decompress(CompBody);
+ _ ->
+ Doc#doc.body
+ end,
+ NewDoc = new_revid(Doc#doc{
+ revs = {OldPos, [OldRev]},
+ body = Body
+ }),
+
+ % Merge our modified new doc into the tree
+ #full_doc_info{rev_tree = RevTree} = FDI,
+ case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of
+ {NewRevTree, new_leaf} ->
+ % We changed the revision id so inform the caller
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = false
+ },
+ {NewFDI, NewDoc};
+ _ ->
+ throw(doc_recreation_failed)
+ end;
+ _ ->
+ ?RETURN({error, conflict})
+ end;
+merge_rev_tree(FDI, Doc, interactive_edit) ->
+ % We're attempting to merge a new revision into an
+ % undeleted document. To not be a conflict we require
+ % that the merge results in extending a branch.
+
+ RevTree = FDI#full_doc_info.rev_tree,
+ case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of
+ {NewRevTree, new_leaf} when not Doc#doc.deleted ->
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = false
+ },
+ {NewFDI, Doc};
+ {NewRevTree, new_leaf} when Doc#doc.deleted ->
+ % We have to check if we just deleted this
+ % document completely or if it was a conflict
+ % resolution.
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = couch_doc:is_deleted(NewRevTree)
+ },
+ {NewFDI, Doc};
+ _ ->
+ ?RETURN({error, conflict})
+ end;
+merge_rev_tree(FDI, Doc, replicated_changes) ->
+ % We're merging in revisions without caring about
+ % conflicts. Most likely this is a replication update.
+ RevTree = FDI#full_doc_info.rev_tree,
+ {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)),
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = couch_doc:is_deleted(NewRevTree)
+ },
+ % If a replicated change did not modify the revision
+ % tree then we've got nothing else to do.
+ if NewFDI /= FDI -> ok; true ->
+ ?RETURN({ok, []})
+ end,
+ {NewFDI, Doc}.
+
+
+
+validate_doc_update(TxDb, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
+ case catch fabric2_db:check_is_admin(TxDb) of
+ ok -> validate_ddoc(TxDb, Doc);
+ Error -> ?RETURN(Error)
+ end;
+validate_doc_update(_TxDb, #doc{id = <<"_local/", _/binary>>}, _) ->
+ ok;
+validate_doc_update(TxDb, Doc, GetDiskDocFun) ->
+ Fun = fun() ->
+ DiskDoc = GetDiskDocFun(),
+ JsonCtx = couch_util:json_user_ctx(TxDb),
+ SecObj = fabric2_db:get_security(TxDb),
+ try
+ lists:map(fun(VDU) ->
+ case VDU(Doc, DiskDoc, JsonCtx, SecObj) of
+ ok -> ok;
+ Error -> ?RETURN(Error)
+ end
+ end, fabric2_db:get_vdus(TxDb)),
+ ok
+ catch
+ throw:Error ->
+ Error
+ end
+ end,
+ Stat = [couchdb, query_server, vdu_process_time],
+ couch_stats:update_histogram(Stat, Fun).
+
+
+validate_ddoc(TxDb, DDoc) ->
+ try
+ ok = couch_index_server:validate(TxDb, couch_doc:with_ejson_body(DDoc))
+ catch
+ throw:{invalid_design_doc, Reason} ->
+ throw({bad_request, invalid_design_doc, Reason});
+ throw:{compilation_error, Reason} ->
+ throw({bad_request, compilation_error, Reason});
+ throw:Error ->
+ ?RETURN(Error)
+ end.
+
+
+find_prev_known_rev(_Pos, []) ->
+ not_found;
+find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) ->
+ % doc records are skipped because these are the result
+ % of replication sending us an update. We're only interested
+ % in what we have locally since we're comparing attachment
+ % stubs. The replicator should never do this because it
+ % should only ever send leaves but the possibility exists
+ % so we account for it.
+ find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
+ find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) ->
+ {Pos, [Rev || {Rev, _Val} <- DocPath]}.
+
+
+new_revid(Doc) ->
+ #doc{
+ body = Body,
+ revs = {OldStart, OldRevs},
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+
+ DigestedAtts = lists:foldl(fun(Att, Acc) ->
+ [N, T, M] = couch_att:fetch([name, type, md5], Att),
+ case M == <<>> of
+ true -> Acc;
+ false -> [{N, T, M} | Acc]
+ end
+ end, [], Atts),
+
+ Rev = case DigestedAtts of
+ Atts2 when length(Atts) =/= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ list_to_binary(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
+ SigTerm = [Deleted, OldStart, OldRev, Body, Atts2],
+ couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]))
+ end,
+
+ Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}.
+
+
+doc_to_fdb(TxDb, #doc{} = Doc) ->
+ #doc{
+ id = Id,
+ revs = {Start, [Rev | _]},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+ Key = {<<"docs">>, Id, Start, Rev},
+ KeyBin = fabric2_db:pack(TxDb, Key),
+ Val = {Body, Atts, Deleted},
+ {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_doc(DocId, Pos, Path, Bin) when is_binary(Bin) ->
+ {Body, Atts, Deleted} = binary_to_term(Bin, [safe]),
+ #doc{
+ id = DocId,
+ revs = {Pos, Path},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ };
+fdb_to_doc(_DocId, _Pos, _Path, not_found) ->
+ {not_found, missing}.
+
+
+fdi_to_fdb(TxDb, #full_doc_info{} = FDI) ->
+ #full_doc_info{
+ id = Id,
+ deleted = Deleted,
+ rev_tree = RevTree
+ } = flush_tree(FDI),
+
+ Key = {<<"revs">>, Id},
+ KeyBin = fabric2_db:pack(TxDb, Key),
+ Val = {Deleted, RevTree, <<16#FFFFFFFFFFFFFFFFFFFF:80>>},
+ {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_fdi(Id, Bin) when is_binary(Bin) ->
+ {Deleted, RevTree, UpdateSeq} = binary_to_term(Bin, [safe]),
+ #full_doc_info{
+ id = Id,
+ deleted = Deleted,
+ rev_tree = RevTree,
+ update_seq = UpdateSeq
+ };
+fdb_to_fdi(_Id, not_found) ->
+ not_found.
+
+
+flush_tree(FDI) ->
+ #full_doc_info{
+ rev_tree = Unflushed
+ } = FDI,
+
+ Flushed = couch_key_tree:map(fun(_Rev, Value) ->
+ case Value of
+ #doc{deleted = Del} -> #leaf{deleted = Del};
+ _ -> Value
+ end
+ end, Unflushed),
+
+ FDI#full_doc_info{
+ rev_tree = Flushed
+ }.