You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/04/25 16:35:52 UTC
[couchdb] branch COUCHDB-3287-pluggable-storage-engines updated
(5e4b68a -> c340fb0)
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a change to branch COUCHDB-3287-pluggable-storage-engines
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
discards 5e4b68a Ensure deterministic revisions for attachments
discards 56ae588 Add storage engine test suite
discards 0fec3a9 Implement pluggable storage engines
new ac1deae Implement pluggable storage engines
new ee1ece9 Add storage engine test suite
new c340fb0 Ensure deterministic revisions for attachments
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (5e4b68a)
\
N -- N -- N refs/heads/COUCHDB-3287-pluggable-storage-engines (c340fb0)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omits" are not gone; other references still
refer to them. Any revisions marked "discards" are gone forever.
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "adds" were already present in the repository and have only
been added to this reference.
Summary of changes:
src/couch_mrview/src/couch_mrview.erl | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].
[couchdb] 02/03: Add storage engine test suite
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3287-pluggable-storage-engines
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ee1ece9d8a283d63d1ed277d6104f478168b996c
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Feb 5 12:21:39 2016 -0600
Add storage engine test suite
This allows other storage engine implementations to reuse the same exact
test suite without having to resort to shenanigans like keeping vendored
copies up to date.
COUCHDB-3287
---
src/couch/.gitignore | 5 +
src/couch/src/test_engine_attachments.erl | 85 ++++
src/couch/src/test_engine_compaction.erl | 181 +++++++
src/couch/src/test_engine_fold_changes.erl | 190 ++++++++
src/couch/src/test_engine_fold_docs.erl | 390 +++++++++++++++
src/couch/src/test_engine_get_set_props.erl | 70 +++
src/couch/src/test_engine_open_close_delete.erl | 81 ++++
src/couch/src/test_engine_purge_docs.erl | 158 +++++++
src/couch/src/test_engine_read_write_docs.erl | 317 +++++++++++++
src/couch/src/test_engine_ref_counting.erl | 103 ++++
src/couch/src/test_engine_util.erl | 602 ++++++++++++++++++++++++
src/couch/test/couch_bt_engine_tests.erl | 20 +
12 files changed, 2202 insertions(+)
diff --git a/src/couch/.gitignore b/src/couch/.gitignore
index 30aa173..73fb0b6 100644
--- a/src/couch/.gitignore
+++ b/src/couch/.gitignore
@@ -11,5 +11,10 @@ priv/*.dll
priv/*.exe
vc120.pdb
+test/engines/coverage/
+test/engines/data/
+test/engines/etc/
+test/engines/log/
+
.rebar/
.eunit
diff --git a/src/couch/src/test_engine_attachments.erl b/src/couch/src/test_engine_attachments.erl
new file mode 100644
index 0000000..a19322d
--- /dev/null
+++ b/src/couch/src/test_engine_attachments.erl
@@ -0,0 +1,85 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_attachments).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_write_attachment() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ AttBin = crypto:rand_bytes(32768),
+
+ [Att0] = test_engine_util:prep_atts(Engine, St1, [
+ {<<"ohai.txt">>, AttBin}
+ ]),
+
+ {stream, Stream} = couch_att:fetch(data, Att0),
+ ?assertEqual(true, Engine:is_active_stream(St1, Stream)),
+
+ Actions = [{create, {<<"first">>, [], [Att0]}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+
+ {ok, St4} = Engine:init(DbPath, []),
+ [FDI] = Engine:open_docs(St4, [<<"first">>]),
+
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Atts1 = if not is_binary(Doc1#doc.atts) -> Doc1#doc.atts; true ->
+ couch_compress:decompress(Doc1#doc.atts)
+ end,
+
+ StreamSrc = fun(Sp) -> Engine:open_read_stream(St4, Sp) end,
+ [Att1] = [couch_att:from_disk_term(StreamSrc, T) || T <- Atts1],
+ ReadBin = couch_att:to_binary(Att1),
+ ?assertEqual(AttBin, ReadBin).
+
+
+% N.B. This test may be overly specific for some theoretical
+% storage engines that don't re-initialize their
+% attachments streams when restarting (for instance if
+% we ever have something that stores attachemnts in
+% an external object store)
+cet_inactive_stream() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ AttBin = crypto:rand_bytes(32768),
+
+ [Att0] = test_engine_util:prep_atts(Engine, St1, [
+ {<<"ohai.txt">>, AttBin}
+ ]),
+
+ {stream, Stream} = couch_att:fetch(data, Att0),
+ ?assertEqual(true, Engine:is_active_stream(St1, Stream)),
+
+ Engine:terminate(normal, St1),
+ {ok, St2} = Engine:init(DbPath, []),
+
+ ?assertEqual(false, Engine:is_active_stream(St2, Stream)).
diff --git a/src/couch/src/test_engine_compaction.erl b/src/couch/src/test_engine_compaction.erl
new file mode 100644
index 0000000..b178bae
--- /dev/null
+++ b/src/couch/src/test_engine_compaction.erl
@@ -0,0 +1,181 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_compaction).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_compact_empty() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ Db1 = test_engine_util:db_as_term(Engine, St1),
+ {ok, St2, DbName, _, Term} = test_engine_util:compact(Engine, St1, Path),
+ {ok, St3, undefined} = Engine:finish_compaction(St2, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St3),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_doc() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ Actions = [{create, {<<"foo">>, []}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ Db1 = test_engine_util:db_as_term(Engine, St2),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+ {ok, St4, undefined} = Engine:finish_compaction(St3, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St4),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_local_doc() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+ Actions = [{create, {<<"_local/foo">>, []}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ Db1 = test_engine_util:db_as_term(Engine, St2),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+ {ok, St4, undefined} = Engine:finish_compaction(St3, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St4),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_compact_with_everything() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+
+ % Add a whole bunch of docs
+ DocActions = lists:map(fun(Seq) ->
+ {create, {docid(Seq), [{<<"int">>, Seq}]}}
+ end, lists:seq(1, 1000)),
+
+ LocalActions = lists:map(fun(I) ->
+ {create, {local_docid(I), [{<<"int">>, I}]}}
+ end, lists:seq(1, 25)),
+
+ Actions1 = DocActions ++ LocalActions,
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, St3} = Engine:set_security(St2, [{<<"readers">>, <<"ohai">>}]),
+ {ok, St4} = Engine:set_revs_limit(St3, 500),
+
+ Actions2 = [
+ {create, {<<"foo">>, []}},
+ {create, {<<"bar">>, [{<<"hooray">>, <<"purple">>}]}},
+ {conflict, {<<"bar">>, [{<<"booo">>, false}]}}
+ ],
+
+ {ok, St5} = test_engine_util:apply_actions(Engine, St4, Actions2),
+
+ [FooFDI, BarFDI] = Engine:open_docs(St5, [<<"foo">>, <<"bar">>]),
+
+ FooRev = test_engine_util:prev_rev(FooFDI),
+ BarRev = test_engine_util:prev_rev(BarFDI),
+
+ Actions3 = [
+ {batch, [
+ {purge, {<<"foo">>, FooRev#rev_info.rev}},
+ {purge, {<<"bar">>, BarRev#rev_info.rev}}
+ ]}
+ ],
+
+ {ok, St6} = test_engine_util:apply_actions(Engine, St5, Actions3),
+
+ PurgedIdRevs = [
+ {<<"bar">>, [BarRev#rev_info.rev]},
+ {<<"foo">>, [FooRev#rev_info.rev]}
+ ],
+
+ ?assertEqual(PurgedIdRevs, lists:sort(Engine:get_last_purged(St6))),
+
+ [Att0, Att1, Att2, Att3, Att4] = test_engine_util:prep_atts(Engine, St6, [
+ {<<"ohai.txt">>, crypto:rand_bytes(2048)},
+ {<<"stuff.py">>, crypto:rand_bytes(32768)},
+ {<<"a.erl">>, crypto:rand_bytes(29)},
+ {<<"a.hrl">>, crypto:rand_bytes(5000)},
+ {<<"a.app">>, crypto:rand_bytes(400)}
+ ]),
+
+ Actions4 = [
+ {create, {<<"small_att">>, [], [Att0]}},
+ {create, {<<"large_att">>, [], [Att1]}},
+ {create, {<<"multi_att">>, [], [Att2, Att3, Att4]}}
+ ],
+ {ok, St7} = test_engine_util:apply_actions(Engine, St6, Actions4),
+ {ok, St8} = Engine:commit_data(St7),
+
+ Db1 = test_engine_util:db_as_term(Engine, St8),
+
+ Config = [
+ {"database_compaction", "doc_buffer_size", "1024"},
+ {"database_compaction", "checkpoint_after", "2048"}
+ ],
+
+ {ok, St9, DbName, _, Term} = test_engine_util:with_config(Config, fun() ->
+ test_engine_util:compact(Engine, St8, Path)
+ end),
+
+ {ok, St10, undefined} = Engine:finish_compaction(St9, DbName, [], Term),
+ Db2 = test_engine_util:db_as_term(Engine, St10),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+cet_recompact_updates() ->
+ {ok, Engine, Path, St1} = test_engine_util:init_engine(dbpath),
+
+ Actions1 = [
+ {create, {<<"foo">>, []}},
+ {create, {<<"bar">>, []}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+ {ok, St3, DbName, _, Term} = test_engine_util:compact(Engine, St2, Path),
+
+ Actions2 = [
+ {update, {<<"foo">>, [{<<"updated">>, true}]}},
+ {create, {<<"baz">>, []}}
+ ],
+
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+ Db1 = test_engine_util:db_as_term(Engine, St4),
+
+ {ok, St5, NewPid} = Engine:finish_compaction(St4, DbName, [], Term),
+
+ ?assertEqual(true, is_pid(NewPid)),
+ Ref = erlang:monitor(process, NewPid),
+
+ NewTerm = receive
+ {'$gen_cast', {compact_done, Engine, Term0}} ->
+ Term0;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({compactor_died, Reason})
+ after 10000 ->
+ erlang:error(compactor_timed_out)
+ end,
+
+ {ok, St6, undefined} = Engine:finish_compaction(St5, DbName, [], NewTerm),
+ Db2 = test_engine_util:db_as_term(Engine, St6),
+ Diff = test_engine_util:term_diff(Db1, Db2),
+ ?assertEqual(nodiff, Diff).
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
+
+
+local_docid(I) ->
+ Str = io_lib:format("_local/~4..0b", [I]),
+ iolist_to_binary(Str).
diff --git a/src/couch/src/test_engine_fold_changes.erl b/src/couch/src/test_engine_fold_changes.erl
new file mode 100644
index 0000000..6e97fda
--- /dev/null
+++ b/src/couch/src/test_engine_fold_changes.erl
@@ -0,0 +1,190 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_fold_changes).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+cet_empty_changes() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ ?assertEqual(0, Engine:count_changes_since(St, 0)),
+ ?assertEqual({ok, []}, Engine:fold_changes(St, 0, fun fold_fun/2, [], [])).
+
+
+cet_single_change() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [{create, {<<"a">>, []}}],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(1, Engine:count_changes_since(St2, 0)),
+ ?assertEqual({ok, [{<<"a">>, 1}]},
+ Engine:fold_changes(St2, 0, fun fold_fun/2, [], [])).
+
+
+cet_two_changes() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"a">>, 1}, {<<"b">>, 2}], lists:reverse(Changes)).
+
+
+cet_two_changes_batch() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions1 = [
+ {batch, [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}}
+ ]}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes1} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"a">>, 1}, {<<"b">>, 2}], lists:reverse(Changes1)),
+
+ {ok, Engine, St3} = test_engine_util:init_engine(),
+ Actions2 = [
+ {batch, [
+ {create, {<<"b">>, []}},
+ {create, {<<"a">>, []}}
+ ]}
+ ],
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions2),
+
+ ?assertEqual(2, Engine:count_changes_since(St4, 0)),
+ {ok, Changes2} = Engine:fold_changes(St4, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"b">>, 1}, {<<"a">>, 2}], lists:reverse(Changes2)).
+
+
+cet_update_one() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {update, {<<"a">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(1, Engine:count_changes_since(St2, 0)),
+ ?assertEqual({ok, [{<<"a">>, 2}]},
+ Engine:fold_changes(St2, 0, fun fold_fun/2, [], [])).
+
+
+cet_update_first_of_two() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}},
+ {update, {<<"a">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"b">>, 2}, {<<"a">>, 3}], lists:reverse(Changes)).
+
+
+cet_update_second_of_two() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = [
+ {create, {<<"a">>, []}},
+ {create, {<<"b">>, []}},
+ {update, {<<"b">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ ?assertEqual(2, Engine:count_changes_since(St2, 0)),
+ {ok, Changes} = Engine:fold_changes(St2, 0, fun fold_fun/2, [], []),
+ ?assertEqual([{<<"a">>, 1}, {<<"b">>, 3}], lists:reverse(Changes)).
+
+
+cet_check_mutation_ordering() ->
+ Actions = shuffle(lists:map(fun(Seq) ->
+ {create, {docid(Seq), []}}
+ end, lists:seq(1, ?NUM_DOCS))),
+
+ DocIdOrder = [DocId || {_, {DocId, _}} <- Actions],
+ DocSeqs = lists:zip(DocIdOrder, lists:seq(1, ?NUM_DOCS)),
+
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+
+ % First lets see that we can get the correct
+ % suffix/prefix starting at every update sequence
+ lists:foreach(fun(Seq) ->
+ {ok, Suffix} = Engine:fold_changes(St2, Seq, fun fold_fun/2, [], []),
+ ?assertEqual(lists:nthtail(Seq, DocSeqs), lists:reverse(Suffix)),
+
+ {ok, Prefix} = Engine:fold_changes(St2, Seq, fun fold_fun/2, [], [
+ {dir, rev}
+ ]),
+ ?assertEqual(lists:sublist(DocSeqs, Seq + 1), Prefix)
+ end, lists:seq(0, ?NUM_DOCS)),
+
+ ok = do_mutation_ordering(Engine, St2, ?NUM_DOCS + 1, DocSeqs, []).
+
+
+do_mutation_ordering(Engine, St, _Seq, [], FinalDocSeqs) ->
+ {ok, RevOrder} = Engine:fold_changes(St, 0, fun fold_fun/2, [], []),
+ ?assertEqual(FinalDocSeqs, lists:reverse(RevOrder)),
+ ok;
+
+do_mutation_ordering(Engine, St, Seq, [{DocId, _OldSeq} | Rest], DocSeqAcc) ->
+ Actions = [{update, {DocId, []}}],
+ {ok, NewSt} = test_engine_util:apply_actions(Engine, St, Actions),
+ NewAcc = DocSeqAcc ++ [{DocId, Seq}],
+ Expected = Rest ++ NewAcc,
+ {ok, RevOrder} = Engine:fold_changes(NewSt, 0, fun fold_fun/2, [], []),
+ ?assertEqual(Expected, lists:reverse(RevOrder)),
+ do_mutation_ordering(Engine, NewSt, Seq + 1, Rest, NewAcc).
+
+
+shuffle(List) ->
+ random:seed(os:timestamp()),
+ Paired = [{random:uniform(), I} || I <- List],
+ Sorted = lists:sort(Paired),
+ [I || {_, I} <- Sorted].
+
+
+remove_random(List) ->
+ Pos = random:uniform(length(List)),
+ remove_random(Pos, List).
+
+
+remove_random(1, [Item | Rest]) ->
+ {Item, Rest};
+
+remove_random(N, [Skip | Rest]) when N > 1 ->
+ {Item, Tail} = remove_random(N - 1, Rest),
+ {Item, [Skip | Tail]}.
+
+
+fold_fun(#full_doc_info{id=Id, update_seq=Seq}, Acc) ->
+ {ok, [{Id, Seq} | Acc]}.
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
diff --git a/src/couch/src/test_engine_fold_docs.erl b/src/couch/src/test_engine_fold_docs.erl
new file mode 100644
index 0000000..34d7f3e
--- /dev/null
+++ b/src/couch/src/test_engine_fold_docs.erl
@@ -0,0 +1,390 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_fold_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+cet_fold_all() ->
+ fold_all(fold_docs, fun docid/1).
+
+
+cet_fold_all_local() ->
+ fold_all(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_start_key() ->
+ fold_start_key(fold_docs, fun docid/1).
+
+
+cet_fold_start_key_local() ->
+ fold_start_key(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_end_key() ->
+ fold_end_key(fold_docs, fun docid/1).
+
+
+cet_fold_end_key_local() ->
+ fold_end_key(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_end_key_gt() ->
+ fold_end_key_gt(fold_docs, fun docid/1).
+
+
+cet_fold_end_key_gt_local() ->
+ fold_end_key_gt(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_range() ->
+ fold_range(fold_docs, fun docid/1).
+
+
+cet_fold_range_local() ->
+ fold_range(fold_local_docs, fun local_docid/1).
+
+
+cet_fold_stop() ->
+ fold_stop(fold_docs, fun docid/1).
+
+
+cet_fold_stop_local() ->
+ fold_stop(fold_local_docs, fun local_docid/1).
+
+
+% This is a loose test but we have to have this until
+% I figure out what to do about the total_rows/offset
+% meta data included in _all_docs
+cet_fold_include_reductions() ->
+ {ok, Engine, St} = init_st(fun docid/1),
+ FoldFun = fun(_, _, nil) -> {ok, nil} end,
+ {ok, Count, nil} = Engine:fold_docs(St, FoldFun, nil, [include_reductions]),
+ ?assert(is_integer(Count)),
+ ?assert(Count >= 0).
+
+
+fold_all(FoldFun, DocIdFun) ->
+ DocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], []),
+ ?assertEqual(?NUM_DOCS, length(DocIdAccFwd)),
+ ?assertEqual(DocIds, lists:reverse(DocIdAccFwd)),
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [{dir, rev}]),
+ ?assertEqual(?NUM_DOCS, length(DocIdAccRev)),
+ ?assertEqual(DocIds, DocIdAccRev).
+
+
+fold_start_key(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ StartKeyNum = ?NUM_DOCS div 4,
+ StartKey = DocIdFun(StartKeyNum),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(StartKeyNum, ?NUM_DOCS)],
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(1, StartKeyNum)],
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<255>>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<"">>}
+ ])),
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_end_key(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ EndKeyNum = ?NUM_DOCS div 4,
+ EndKey = DocIdFun(EndKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key, <<"">>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key, <<255>>}
+ ])),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(1, EndKeyNum)],
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(EndKeyNum, ?NUM_DOCS)],
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_end_key_gt(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ EndKeyNum = ?NUM_DOCS div 4,
+ EndKey = DocIdFun(EndKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key_gt, <<"">>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key_gt, <<255>>}
+ ])),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key_gt, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key_gt, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(1, EndKeyNum - 1)],
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {end_key_gt, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(EndKeyNum + 1, ?NUM_DOCS)],
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {end_key_gt, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_range(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ StartKeyNum = ?NUM_DOCS div 4,
+ EndKeyNum = StartKeyNum * 3,
+
+ StartKey = DocIdFun(StartKeyNum),
+ EndKey = DocIdFun(EndKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<"">>},
+ {end_key, <<"">>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<"">>},
+ {end_key, <<255>>}
+ ])),
+
+ AllDocIds = [DocIdFun(I) || I <- lists:seq(1, ?NUM_DOCS)],
+
+ {ok, AllDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, <<"">>},
+ {end_key, <<255>>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, lists:reverse(AllDocIdAccFwd)),
+
+ {ok, AllDocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, <<255>>},
+ {end_key_gt, <<"">>}
+ ]),
+ ?assertEqual(length(AllDocIds), length(AllDocIdAccFwd)),
+ ?assertEqual(AllDocIds, AllDocIdAccRev),
+
+ DocIdsFwd = [DocIdFun(I) || I <- lists:seq(StartKeyNum, EndKeyNum)],
+
+ {ok, DocIdAccFwd} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {start_key, StartKey},
+ {end_key, EndKey}
+ ]),
+ ?assertEqual(length(DocIdsFwd), length(DocIdAccFwd)),
+ ?assertEqual(DocIdsFwd, lists:reverse(DocIdAccFwd)),
+
+ DocIdsRev = [DocIdFun(I) || I <- lists:seq(StartKeyNum, EndKeyNum)],
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, StartKey},
+ {end_key, EndKey}
+ ])),
+
+ {ok, DocIdAccRev} = Engine:FoldFun(St, fun fold_fun/2, [], [
+ {dir, rev},
+ {start_key, EndKey},
+ {end_key, StartKey}
+ ]),
+ ?assertEqual(length(DocIdsRev), length(DocIdAccRev)),
+ ?assertEqual(DocIdsRev, DocIdAccRev).
+
+
+fold_stop(FoldFun, DocIdFun) ->
+ {ok, Engine, St} = init_st(DocIdFun),
+
+ StartKeyNum = ?NUM_DOCS div 4,
+ StartKey = DocIdFun(StartKeyNum),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {start_key, <<255>>}
+ ])),
+
+ ?assertEqual({ok, []}, Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {dir, rev},
+ {start_key, <<"">>}
+ ])),
+
+ SuffixDocIds = [DocIdFun(I) || I <- lists:seq(?NUM_DOCS - 3, ?NUM_DOCS)],
+
+ {ok, SuffixDocIdAcc} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {start_key, DocIdFun(?NUM_DOCS - 3)}
+ ]),
+ ?assertEqual(length(SuffixDocIds), length(SuffixDocIdAcc)),
+ ?assertEqual(SuffixDocIds, lists:reverse(SuffixDocIdAcc)),
+
+ PrefixDocIds = [DocIdFun(I) || I <- lists:seq(1, 3)],
+
+ {ok, PrefixDocIdAcc} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {dir, rev},
+ {start_key, DocIdFun(3)}
+ ]),
+ ?assertEqual(3, length(PrefixDocIdAcc)),
+ ?assertEqual(PrefixDocIds, PrefixDocIdAcc),
+
+ FiveDocIdsFwd = [DocIdFun(I)
+ || I <- lists:seq(StartKeyNum, StartKeyNum + 5)],
+
+ {ok, FiveDocIdAccFwd} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(FiveDocIdsFwd), length(FiveDocIdAccFwd)),
+ ?assertEqual(FiveDocIdsFwd, lists:reverse(FiveDocIdAccFwd)),
+
+ FiveDocIdsRev = [DocIdFun(I)
+ || I <- lists:seq(StartKeyNum - 5, StartKeyNum)],
+
+ {ok, FiveDocIdAccRev} = Engine:FoldFun(St, fun fold_fun_stop/2, [], [
+ {dir, rev},
+ {start_key, StartKey}
+ ]),
+ ?assertEqual(length(FiveDocIdsRev), length(FiveDocIdAccRev)),
+ ?assertEqual(FiveDocIdsRev, FiveDocIdAccRev).
+
+
+init_st(DocIdFun) ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+ Actions = lists:map(fun(Id) ->
+ {create, {DocIdFun(Id), [{<<"int">>, Id}]}}
+ end, lists:seq(1, ?NUM_DOCS)),
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, Engine, St2}.
+
+
+fold_fun(Doc, Acc) ->
+ Id = case Doc of
+ #doc{id = Id0} -> Id0;
+ #full_doc_info{id = Id0} -> Id0
+ end,
+ {ok, [Id | Acc]}.
+
+
+fold_fun_stop(Doc, Acc) ->
+ Id = case Doc of
+ #doc{id = Id0} -> Id0;
+ #full_doc_info{id = Id0} -> Id0
+ end,
+ case length(Acc) of
+ N when N =< 4 ->
+ {ok, [Id | Acc]};
+ _ ->
+ {stop, [Id | Acc]}
+ end.
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
+
+
+local_docid(I) ->
+ Str = io_lib:format("_local/~4..0b", [I]),
+ iolist_to_binary(Str).
diff --git a/src/couch/src/test_engine_get_set_props.erl b/src/couch/src/test_engine_get_set_props.erl
new file mode 100644
index 0000000..6d2a447
--- /dev/null
+++ b/src/couch/src/test_engine_get_set_props.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_get_set_props).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+cet_default_props() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St} = Engine:init(DbPath, [
+ create,
+ {default_security_object, dso}
+ ]),
+
+ Node = node(),
+
+ ?assertEqual(0, Engine:get_doc_count(St)),
+ ?assertEqual(0, Engine:get_del_doc_count(St)),
+ ?assertEqual(true, is_list(Engine:get_size_info(St))),
+ ?assertEqual(true, is_integer(Engine:get_disk_version(St))),
+ ?assertEqual(0, Engine:get_update_seq(St)),
+ ?assertEqual(0, Engine:get_purge_seq(St)),
+ ?assertEqual([], Engine:get_last_purged(St)),
+ ?assertEqual(dso, Engine:get_security(St)),
+ ?assertEqual(1000, Engine:get_revs_limit(St)),
+ ?assertMatch(<<_:32/binary>>, Engine:get_uuid(St)),
+ ?assertEqual([{Node, 0}], Engine:get_epochs(St)),
+ ?assertEqual(0, Engine:get_compacted_seq(St)).
+
+
+cet_set_security() ->
+ check_prop_set(get_security, set_security, dso, [{<<"readers">>, []}]).
+
+
+cet_set_revs_limit() ->
+ check_prop_set(get_revs_limit, set_revs_limit, 1000, 50).
+
+
+check_prop_set(GetFun, SetFun, Default, Value) ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St0} = Engine:init(DbPath, [
+ create,
+ {default_security_object, dso}
+ ]),
+ ?assertEqual(Default, Engine:GetFun(St0)),
+
+ {ok, St1} = Engine:SetFun(St0, Value),
+ ?assertEqual(Value, Engine:GetFun(St1)),
+
+ {ok, St2} = Engine:commit_data(St1),
+ Engine:terminate(normal, St2),
+
+ {ok, St3} = Engine:init(DbPath, []),
+ ?assertEqual(Value, Engine:GetFun(St3)).
diff --git a/src/couch/src/test_engine_open_close_delete.erl b/src/couch/src/test_engine_open_close_delete.erl
new file mode 100644
index 0000000..b099d9f
--- /dev/null
+++ b/src/couch/src/test_engine_open_close_delete.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_open_close_delete).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+cet_open_non_existent() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ ?assertThrow({not_found, no_db_file}, Engine:init(DbPath, [])),
+ ?assertEqual(false, Engine:exists(DbPath)).
+
+
+cet_open_create() ->
+ process_flag(trap_exit, true),
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ ?assertMatch({ok, _}, Engine:init(DbPath, [create])),
+ ?assertEqual(true, Engine:exists(DbPath)).
+
+
+cet_open_when_exists() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ ?assertMatch({ok, _}, Engine:init(DbPath, [create])),
+ ?assertThrow({error, eexist}, Engine:init(DbPath, [create])).
+
+
+cet_terminate() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ {ok, St} = Engine:init(DbPath, [create]),
+ Engine:terminate(normal, St),
+ ?assertEqual(true, Engine:exists(DbPath)).
+
+
+cet_rapid_recycle() ->
+ Engine = test_engine_util:get_engine(),
+ DbPath = test_engine_util:dbpath(),
+
+ {ok, St0} = Engine:init(DbPath, [create]),
+ Engine:terminate(normal, St0),
+
+ lists:foreach(fun(_) ->
+ {ok, St1} = Engine:init(DbPath, []),
+ Engine:terminate(normal, St1)
+ end, lists:seq(1, 100)).
+
+
+cet_delete() ->
+ Engine = test_engine_util:get_engine(),
+ RootDir = test_engine_util:rootdir(),
+ DbPath = test_engine_util:dbpath(),
+
+ ?assertEqual(false, Engine:exists(DbPath)),
+ {ok, St} = Engine:init(DbPath, [create]),
+ Engine:terminate(normal, St),
+ ?assertEqual(true, Engine:exists(DbPath)),
+ ?assertEqual(ok, Engine:delete(RootDir, DbPath, [async])),
+ ?assertEqual(false, Engine:exists(DbPath)).
diff --git a/src/couch/src/test_engine_purge_docs.erl b/src/couch/src/test_engine_purge_docs.erl
new file mode 100644
index 0000000..e5bf249
--- /dev/null
+++ b/src/couch/src/test_engine_purge_docs.erl
@@ -0,0 +1,158 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_purge_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_purge_simple() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(1, Engine:get_doc_count(St2)),
+ ?assertEqual(0, Engine:get_del_doc_count(St2)),
+ ?assertEqual(1, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(0, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(2, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+
+
+cet_purge_conflicts() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {conflict, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(1, Engine:get_doc_count(St2)),
+ ?assertEqual(0, Engine:get_del_doc_count(St2)),
+ ?assertEqual(2, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI1] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev1 = test_engine_util:prev_rev(FDI1),
+ Rev1 = PrevRev1#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev1}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(1, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(4, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev1]}], Engine:get_last_purged(St3)),
+
+ [FDI2] = Engine:open_docs(St3, [<<"foo">>]),
+ PrevRev2 = test_engine_util:prev_rev(FDI2),
+ Rev2 = PrevRev2#rev_info.rev,
+
+ Actions3 = [
+ {purge, {<<"foo">>, Rev2}}
+ ],
+ {ok, St4} = test_engine_util:apply_actions(Engine, St3, Actions3),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(5, Engine:get_update_seq(St4)),
+ ?assertEqual(2, Engine:get_purge_seq(St4)),
+ ?assertEqual([{<<"foo">>, [Rev2]}], Engine:get_last_purged(St4)).
+
+
+cet_add_delete_purge() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {delete, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(0, Engine:get_doc_count(St2)),
+ ?assertEqual(1, Engine:get_del_doc_count(St2)),
+ ?assertEqual(2, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(0, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(3, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
+
+
+cet_add_two_purge_one() ->
+ {ok, Engine, St1} = test_engine_util:init_engine(),
+
+ Actions1 = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {create, {<<"bar">>, []}}
+ ],
+
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions1),
+
+ ?assertEqual(2, Engine:get_doc_count(St2)),
+ ?assertEqual(0, Engine:get_del_doc_count(St2)),
+ ?assertEqual(2, Engine:get_update_seq(St2)),
+ ?assertEqual(0, Engine:get_purge_seq(St2)),
+ ?assertEqual([], Engine:get_last_purged(St2)),
+
+ [FDI] = Engine:open_docs(St2, [<<"foo">>]),
+ PrevRev = test_engine_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+
+ Actions2 = [
+ {purge, {<<"foo">>, Rev}}
+ ],
+ {ok, St3} = test_engine_util:apply_actions(Engine, St2, Actions2),
+
+ ?assertEqual(1, Engine:get_doc_count(St3)),
+ ?assertEqual(0, Engine:get_del_doc_count(St3)),
+ ?assertEqual(3, Engine:get_update_seq(St3)),
+ ?assertEqual(1, Engine:get_purge_seq(St3)),
+ ?assertEqual([{<<"foo">>, [Rev]}], Engine:get_last_purged(St3)).
diff --git a/src/couch/src/test_engine_read_write_docs.erl b/src/couch/src/test_engine_read_write_docs.erl
new file mode 100644
index 0000000..4307702
--- /dev/null
+++ b/src/couch/src/test_engine_read_write_docs.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_read_write_docs).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+cet_read_empty_docs() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ ?assertEqual([not_found], Engine:open_docs(St, [<<"foo">>])),
+ ?assertEqual(
+ [not_found, not_found],
+ Engine:open_docs(St, [<<"a">>, <<"b">>])
+ ).
+
+
+cet_read_empty_local_docs() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ ?assertEqual([not_found], Engine:open_local_docs(St, [<<"_local/foo">>])),
+ ?assertEqual(
+ [not_found, not_found],
+ Engine:open_local_docs(St, [<<"_local/a">>, <<"_local/b">>])
+ ).
+
+
+cet_write_one_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(1, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(1, Engine:get_update_seq(St4)),
+
+ [FDI] = Engine:open_docs(St4, [<<"foo">>]),
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Body1 = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+ ?assertEqual([{<<"vsn">>, 1}], Body1).
+
+
+cet_write_two_docs() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {create, {<<"bar">>, [{<<"stuff">>, true}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(2, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(2, Engine:get_update_seq(St4)),
+
+ Resps = Engine:open_docs(St4, [<<"foo">>, <<"bar">>]),
+ ?assertEqual(false, lists:member(not_found, Resps)).
+
+
+cet_write_three_doc_batch() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {batch, [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {create, {<<"bar">>, [{<<"stuff">>, true}]}},
+ {create, {<<"baz">>, []}}
+ ]}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(3, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(3, Engine:get_update_seq(St4)),
+
+ Resps = Engine:open_docs(St4, [<<"foo">>, <<"bar">>, <<"baz">>]),
+ ?assertEqual(false, lists:member(not_found, Resps)).
+
+
+cet_update_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {update, {<<"foo">>, [{<<"vsn">>, 2}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(1, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(2, Engine:get_update_seq(St4)),
+
+ [FDI] = Engine:open_docs(St4, [<<"foo">>]),
+
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Body1 = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+
+ ?assertEqual([{<<"vsn">>, 2}], Body1).
+
+
+cet_delete_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"foo">>, [{<<"vsn">>, 1}]}},
+ {delete, {<<"foo">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(1, Engine:get_del_doc_count(St4)),
+ ?assertEqual(2, Engine:get_update_seq(St4)),
+
+ [FDI] = Engine:open_docs(St4, [<<"foo">>]),
+
+ #rev_info{
+ rev = {RevPos, PrevRevId},
+ deleted = Deleted,
+ body_sp = DocPtr
+ } = test_engine_util:prev_rev(FDI),
+
+ Doc0 = #doc{
+ id = <<"foo">>,
+ revs = {RevPos, [PrevRevId]},
+ deleted = Deleted,
+ body = DocPtr
+ },
+
+ Doc1 = Engine:read_doc_body(St4, Doc0),
+ Body1 = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+
+ ?assertEqual([], Body1).
+
+
+cet_write_local_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"_local/foo">>, [{<<"yay">>, false}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(0, Engine:get_update_seq(St4)),
+
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+ [#doc{} = Doc] = Engine:open_local_docs(St4, [<<"_local/foo">>]),
+ ?assertEqual([{<<"yay">>, false}], Doc#doc.body).
+
+
+cet_write_mixed_batch() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {batch, [
+ {create, {<<"bar">>, []}},
+ {create, {<<"_local/foo">>, [{<<"yay">>, false}]}}
+ ]}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(1, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(1, Engine:get_update_seq(St4)),
+
+ [#full_doc_info{}] = Engine:open_docs(St4, [<<"bar">>]),
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+
+ [not_found] = Engine:open_local_docs(St4, [<<"bar">>]),
+ [#doc{}] = Engine:open_local_docs(St4, [<<"_local/foo">>]).
+
+
+cet_update_local_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"_local/foo">>, []}},
+ {update, {<<"_local/foo">>, [{<<"stuff">>, null}]}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(0, Engine:get_update_seq(St4)),
+
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+ [#doc{} = Doc] = Engine:open_local_docs(St4, [<<"_local/foo">>]),
+ ?assertEqual([{<<"stuff">>, null}], Doc#doc.body).
+
+
+cet_delete_local_doc() ->
+ {ok, Engine, DbPath, St1} = test_engine_util:init_engine(dbpath),
+
+ ?assertEqual(0, Engine:get_doc_count(St1)),
+ ?assertEqual(0, Engine:get_del_doc_count(St1)),
+ ?assertEqual(0, Engine:get_update_seq(St1)),
+
+ Actions = [
+ {create, {<<"_local/foo">>, []}},
+ {delete, {<<"_local/foo">>, []}}
+ ],
+ {ok, St2} = test_engine_util:apply_actions(Engine, St1, Actions),
+ {ok, St3} = Engine:commit_data(St2),
+ Engine:terminate(normal, St3),
+ {ok, St4} = Engine:init(DbPath, []),
+
+ ?assertEqual(0, Engine:get_doc_count(St4)),
+ ?assertEqual(0, Engine:get_del_doc_count(St4)),
+ ?assertEqual(0, Engine:get_update_seq(St4)),
+
+ [not_found] = Engine:open_docs(St4, [<<"_local/foo">>]),
+ ?assertEqual([not_found], Engine:open_local_docs(St4, [<<"_local/foo">>])).
diff --git a/src/couch/src/test_engine_ref_counting.erl b/src/couch/src/test_engine_ref_counting.erl
new file mode 100644
index 0000000..18e75fb
--- /dev/null
+++ b/src/couch/src/test_engine_ref_counting.erl
@@ -0,0 +1,103 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_ref_counting).
+-compile(export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_CLIENTS, 1000).
+
+
+cet_empty_monitors() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+ Pids = Engine:monitored_by(St),
+ ?assert(is_list(Pids)),
+ ?assertEqual([], Pids -- [self(), whereis(couch_stats_process_tracker)]).
+
+
+cet_incref_decref() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+
+ {Pid, _} = Client = start_client(Engine, St),
+ wait_client(Client),
+
+ Pids1 = Engine:monitored_by(St),
+ ?assert(lists:member(Pid, Pids1)),
+
+ close_client(Client),
+
+ Pids2 = Engine:monitored_by(St),
+ ?assert(not lists:member(Pid, Pids2)).
+
+
+cet_incref_decref_many() ->
+ {ok, Engine, St} = test_engine_util:init_engine(),
+ Clients = lists:map(fun(_) ->
+ start_client(Engine, St)
+ end, lists:seq(1, ?NUM_CLIENTS)),
+
+ lists:foreach(fun(C) -> wait_client(C) end, Clients),
+
+ Pids1 = Engine:monitored_by(St),
+ % +2 for db pid and process tracker
+ ?assertEqual(?NUM_CLIENTS + 2, length(Pids1)),
+
+ lists:foreach(fun(C) -> close_client(C) end, Clients),
+
+ Pids2 = Engine:monitored_by(St),
+ ?assertEqual(2, length(Pids2)).
+
+
+start_client(Engine, St1) ->
+ spawn_monitor(fun() ->
+ {ok, St2} = Engine:incref(St1),
+
+ receive
+ {waiting, Pid} ->
+ Pid ! go
+ after 1000 ->
+ erlang:error(timeout)
+ end,
+
+ receive
+ close ->
+ ok
+ after 1000 ->
+ erlang:error(timeout)
+ end,
+
+ Engine:decref(St2)
+ end).
+
+
+wait_client({Pid, _Ref}) ->
+ Pid ! {waiting, self()},
+ receive
+ go -> ok
+ after 1000 ->
+ erlang:error(timeout)
+ end.
+
+
+close_client({Pid, Ref}) ->
+ Pid ! close,
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 1000 ->
+ erlang:error(timeout)
+ end.
+
diff --git a/src/couch/src/test_engine_util.erl b/src/couch/src/test_engine_util.erl
new file mode 100644
index 0000000..33048d3
--- /dev/null
+++ b/src/couch/src/test_engine_util.erl
@@ -0,0 +1,602 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(test_engine_util).
+-compile(export_all).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TEST_MODULES, [
+ test_engine_open_close_delete,
+ test_engine_get_set_props,
+ test_engine_read_write_docs,
+ test_engine_attachments,
+ test_engine_fold_docs,
+ test_engine_fold_changes,
+ test_engine_purge_docs,
+ test_engine_compaction,
+ test_engine_ref_counting
+]).
+
+
+create_tests(EngineApp) ->
+ create_tests(EngineApp, EngineApp).
+
+
+create_tests(EngineApp, EngineModule) ->
+ application:set_env(couch, test_engine, {EngineApp, EngineModule}),
+ Tests = lists:map(fun(TestMod) ->
+ {atom_to_list(TestMod), gather(TestMod)}
+ end, ?TEST_MODULES),
+ Setup = fun() ->
+ Ctx = test_util:start_couch(),
+ config:set("log", "include_sasl", "false", false),
+ Ctx
+ end,
+ {
+ setup,
+ Setup,
+ fun test_util:stop_couch/1,
+ fun(_) -> Tests end
+ }.
+
+
+gather(Module) ->
+ Exports = Module:module_info(exports),
+ Tests = lists:foldl(fun({Fun, Arity}, Acc) ->
+ case {atom_to_list(Fun), Arity} of
+ {[$c, $e, $t, $_ | _], 0} ->
+ TestFun = make_test_fun(Module, Fun),
+ [{spawn, TestFun} | Acc];
+ _ ->
+ Acc
+ end
+ end, [], Exports),
+ lists:reverse(Tests).
+
+
+make_test_fun(Module, Fun) ->
+ Name = lists:flatten(io_lib:format("~s:~s", [Module, Fun])),
+ Wrapper = fun() ->
+ process_flag(trap_exit, true),
+ Module:Fun()
+ end,
+ {Name, Wrapper}.
+
+rootdir() ->
+ config:get("couchdb", "database_dir", ".").
+
+
+dbpath() ->
+ binary_to_list(filename:join(rootdir(), couch_uuids:random())).
+
+
+get_engine() ->
+ case application:get_env(couch, test_engine) of
+ {ok, {_, Engine}} ->
+ Engine;
+ _ ->
+ couch_bt_engine
+ end.
+
+
+init_engine() ->
+ init_engine(default).
+
+
+init_engine(default) ->
+ Engine = get_engine(),
+ DbPath = dbpath(),
+ {ok, St} = Engine:init(DbPath, [
+ create,
+ {default_security_object, []}
+ ]),
+ {ok, Engine, St};
+
+init_engine(dbpath) ->
+ Engine = get_engine(),
+ DbPath = dbpath(),
+ {ok, St} = Engine:init(DbPath, [
+ create,
+ {default_security_object, []}
+ ]),
+ {ok, Engine, DbPath, St}.
+
+
+apply_actions(_Engine, St, []) ->
+ {ok, St};
+
+apply_actions(Engine, St, [Action | Rest]) ->
+ NewSt = apply_action(Engine, St, Action),
+ apply_actions(Engine, NewSt, Rest).
+
+
+apply_action(Engine, St, {batch, BatchActions}) ->
+ apply_batch(Engine, St, BatchActions);
+
+apply_action(Engine, St, Action) ->
+ apply_batch(Engine, St, [Action]).
+
+
+apply_batch(Engine, St, Actions) ->
+ UpdateSeq = Engine:get_update_seq(St) + 1,
+ AccIn = {UpdateSeq, [], [], []},
+ AccOut = lists:foldl(fun(Action, Acc) ->
+ {SeqAcc, DocAcc, LDocAcc, PurgeAcc} = Acc,
+ case Action of
+ {_, {<<"_local/", _/binary>>, _}} ->
+ LDoc = gen_local_write(Engine, St, Action),
+ {SeqAcc, DocAcc, [LDoc | LDocAcc], PurgeAcc};
+ _ ->
+ case gen_write(Engine, St, Action, SeqAcc) of
+ {_OldFDI, _NewFDI} = Pair ->
+ {SeqAcc + 1, [Pair | DocAcc], LDocAcc, PurgeAcc};
+ {Pair, NewSeqAcc, NewPurgeInfo} ->
+ NewPurgeAcc = [NewPurgeInfo | PurgeAcc],
+ {NewSeqAcc, [Pair | DocAcc], LDocAcc, NewPurgeAcc}
+ end
+ end
+ end, AccIn, Actions),
+ {_, Docs0, LDocs, PurgeIdRevs} = AccOut,
+ Docs = lists:reverse(Docs0),
+ {ok, NewSt} = Engine:write_doc_infos(St, Docs, LDocs, PurgeIdRevs),
+ NewSt.
+
+
+gen_local_write(Engine, St, {Action, {DocId, Body}}) ->
+ PrevRev = case Engine:open_local_docs(St, [DocId]) of
+ [not_found] ->
+ 0;
+ [#doc{revs = {0, []}}] ->
+ 0;
+ [#doc{revs = {0, [RevStr | _]}}] ->
+ list_to_integer(binary_to_list(RevStr))
+ end,
+ {RevId, Deleted} = case Action of
+ Action when Action == create; Action == update ->
+ {list_to_binary(integer_to_list(PrevRev + 1)), false};
+ delete ->
+ {<<"0">>, true}
+ end,
+ #doc{
+ id = DocId,
+ revs = {0, [RevId]},
+ body = Body,
+ deleted = Deleted
+ }.
+
+gen_write(Engine, St, {Action, {DocId, Body}}, UpdateSeq) ->
+ gen_write(Engine, St, {Action, {DocId, Body, []}}, UpdateSeq);
+
+gen_write(Engine, St, {create, {DocId, Body, Atts0}}, UpdateSeq) ->
+ [not_found] = Engine:open_docs(St, [DocId]),
+ Atts = [couch_att:to_disk_term(Att) || Att <- Atts0],
+
+ Rev = crypto:hash(md5, term_to_binary({DocId, Body, Atts})),
+
+ Doc0 = #doc{
+ id = DocId,
+ revs = {0, [Rev]},
+ deleted = false,
+ body = Body,
+ atts = Atts
+ },
+
+ Doc1 = make_doc_summary(Engine, St, Doc0),
+ {ok, Doc2, Len} = Engine:write_doc_body(St, Doc1),
+
+ Sizes = #size_info{
+ active = Len,
+ external = erlang:external_size(Doc1#doc.body)
+ },
+
+ Leaf = #leaf{
+ deleted = false,
+ ptr = Doc2#doc.body,
+ seq = UpdateSeq,
+ sizes = Sizes,
+ atts = Atts
+ },
+
+ {not_found, #full_doc_info{
+ id = DocId,
+ deleted = false,
+ update_seq = UpdateSeq,
+ rev_tree = [{0, {Rev, Leaf, []}}],
+ sizes = Sizes
+ }};
+
+gen_write(Engine, St, {purge, {DocId, PrevRevs0, _}}, UpdateSeq) ->
+ [#full_doc_info{} = PrevFDI] = Engine:open_docs(St, [DocId]),
+ PrevRevs = if is_list(PrevRevs0) -> PrevRevs0; true -> [PrevRevs0] end,
+
+ #full_doc_info{
+ rev_tree = PrevTree
+ } = PrevFDI,
+
+ {NewTree, RemRevs} = couch_key_tree:remove_leafs(PrevTree, PrevRevs),
+ RemovedAll = lists:sort(RemRevs) == lists:sort(PrevRevs),
+ if RemovedAll -> ok; true ->
+ % If we didn't purge all the requested revisions
+ % then its a bug in the test.
+ erlang:error({invalid_purge_test_revs, PrevRevs})
+ end,
+
+ case NewTree of
+ [] ->
+ % We've completely purged the document
+ {{PrevFDI, not_found}, UpdateSeq, {DocId, RemRevs}};
+ _ ->
+ % We have to relabel the update_seq of all
+ % leaves. See couch_db_updater for details.
+ {NewNewTree, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, InnerSeqAcc) ->
+ {Leaf#leaf{seq = InnerSeqAcc}, InnerSeqAcc + 1};
+ (_RevId, Value, _Type, InnerSeqAcc) ->
+ {Value, InnerSeqAcc}
+ end, UpdateSeq, NewTree),
+ NewFDI = PrevFDI#full_doc_info{
+ update_seq = NewUpdateSeq - 1,
+ rev_tree = NewNewTree
+ },
+ {{PrevFDI, NewFDI}, NewUpdateSeq, {DocId, RemRevs}}
+ end;
+
+gen_write(Engine, St, {Action, {DocId, Body, Atts0}}, UpdateSeq) ->
+ [#full_doc_info{} = PrevFDI] = Engine:open_docs(St, [DocId]),
+ Atts = [couch_att:to_disk_term(Att) || Att <- Atts0],
+
+ #full_doc_info{
+ id = DocId,
+ rev_tree = PrevRevTree
+ } = PrevFDI,
+
+ #rev_info{
+ rev = PrevRev
+ } = prev_rev(PrevFDI),
+
+ {RevPos, PrevRevId} = PrevRev,
+
+ Rev = gen_revision(Action, DocId, PrevRev, Body, Atts),
+
+ Doc0 = #doc{
+ id = DocId,
+ revs = {RevPos + 1, [Rev, PrevRevId]},
+ deleted = false,
+ body = Body,
+ atts = Atts
+ },
+
+ Doc1 = make_doc_summary(Engine, St, Doc0),
+ {ok, Doc2, Len} = Engine:write_doc_body(St, Doc1),
+
+ Deleted = case Action of
+ update -> false;
+ conflict -> false;
+ delete -> true
+ end,
+
+ Sizes = #size_info{
+ active = Len,
+ external = erlang:external_size(Doc1#doc.body)
+ },
+
+ Leaf = #leaf{
+ deleted = Deleted,
+ ptr = Doc2#doc.body,
+ seq = UpdateSeq,
+ sizes = Sizes,
+ atts = Atts
+ },
+
+ Path = gen_path(Action, RevPos, PrevRevId, Rev, Leaf),
+ RevsLimit = Engine:get_revs_limit(St),
+ NodeType = case Action of
+ conflict -> new_branch;
+ _ -> new_leaf
+ end,
+ {NewTree, NodeType} = couch_key_tree:merge(PrevRevTree, Path, RevsLimit),
+
+ NewFDI = PrevFDI#full_doc_info{
+ deleted = couch_doc:is_deleted(NewTree),
+ update_seq = UpdateSeq,
+ rev_tree = NewTree,
+ sizes = Sizes
+ },
+
+ {PrevFDI, NewFDI}.
+
+
+gen_revision(conflict, DocId, _PrevRev, Body, Atts) ->
+ crypto:hash(md5, term_to_binary({DocId, Body, Atts}));
+gen_revision(delete, DocId, PrevRev, Body, Atts) ->
+ gen_revision(update, DocId, PrevRev, Body, Atts);
+gen_revision(update, DocId, PrevRev, Body, Atts) ->
+ crypto:hash(md5, term_to_binary({DocId, PrevRev, Body, Atts})).
+
+
+gen_path(conflict, _RevPos, _PrevRevId, Rev, Leaf) ->
+ {0, {Rev, Leaf, []}};
+gen_path(delete, RevPos, PrevRevId, Rev, Leaf) ->
+ gen_path(update, RevPos, PrevRevId, Rev, Leaf);
+gen_path(update, RevPos, PrevRevId, Rev, Leaf) ->
+ {RevPos, {PrevRevId, ?REV_MISSING, [{Rev, Leaf, []}]}}.
+
+
+make_doc_summary(Engine, St, DocData) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ exit({result, Engine:serialize_doc(St, DocData)})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {result, Summary}} ->
+ Summary;
+ {'DOWN', Ref, _, _, Error} ->
+ erlang:error({make_doc_summary_error, Error})
+ after 1000 ->
+ erlang:error(make_doc_summary_timeout)
+ end.
+
+
+prep_atts(_Engine, _St, []) ->
+ [];
+
+prep_atts(Engine, St, [{FileName, Data} | Rest]) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ {ok, Stream} = Engine:open_write_stream(St, []),
+ exit(write_att(Stream, FileName, Data, Data))
+ end),
+ Att = receive
+ {'DOWN', Ref, _, _, Resp} ->
+ Resp
+ after 5000 ->
+ erlang:error(attachment_write_timeout)
+ end,
+ [Att | prep_atts(Engine, St, Rest)].
+
+
+write_att(Stream, FileName, OrigData, <<>>) ->
+ {StreamEngine, Len, Len, Md5, Md5} = couch_stream:close(Stream),
+ couch_util:check_md5(Md5, crypto:hash(md5, OrigData)),
+ Len = size(OrigData),
+ couch_att:new([
+ {name, FileName},
+ {type, <<"application/octet-stream">>},
+ {data, {stream, StreamEngine}},
+ {att_len, Len},
+ {disk_len, Len},
+ {md5, Md5},
+ {encoding, identity}
+ ]);
+
+write_att(Stream, FileName, OrigData, Data) ->
+ {Chunk, Rest} = case size(Data) > 4096 of
+ true ->
+ <<Head:4096/binary, Tail/binary>> = Data,
+ {Head, Tail};
+ false ->
+ {Data, <<>>}
+ end,
+ ok = couch_stream:write(Stream, Chunk),
+ write_att(Stream, FileName, OrigData, Rest).
+
+
+prev_rev(#full_doc_info{} = FDI) ->
+ #doc_info{
+ revs = [#rev_info{} = PrevRev | _]
+ } = couch_doc:to_doc_info(FDI),
+ PrevRev.
+
+
+db_as_term(Engine, St) ->
+ [
+ {props, db_props_as_term(Engine, St)},
+ {docs, db_docs_as_term(Engine, St)},
+ {local_docs, db_local_docs_as_term(Engine, St)},
+ {changes, db_changes_as_term(Engine, St)}
+ ].
+
+
+db_props_as_term(Engine, St) ->
+ Props = [
+ get_doc_count,
+ get_del_doc_count,
+ get_disk_version,
+ get_update_seq,
+ get_purge_seq,
+ get_last_purged,
+ get_security,
+ get_revs_limit,
+ get_uuid,
+ get_epochs
+ ],
+ lists:map(fun(Fun) ->
+ {Fun, Engine:Fun(St)}
+ end, Props).
+
+
+db_docs_as_term(Engine, St) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, FDIs} = Engine:fold_docs(St, FoldFun, [], []),
+ lists:reverse(lists:map(fun(FDI) ->
+ fdi_to_term(Engine, St, FDI)
+ end, FDIs)).
+
+
+db_local_docs_as_term(Engine, St) ->
+ FoldFun = fun(Doc, Acc) -> {ok, [Doc | Acc]} end,
+ {ok, LDocs} = Engine:fold_local_docs(St, FoldFun, [], []),
+ lists:reverse(LDocs).
+
+
+db_changes_as_term(Engine, St) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, Changes} = Engine:fold_changes(St, 0, FoldFun, [], []),
+ lists:reverse(lists:map(fun(FDI) ->
+ fdi_to_term(Engine, St, FDI)
+ end, Changes)).
+
+
+fdi_to_term(Engine, St, FDI) ->
+ #full_doc_info{
+ id = DocId,
+ rev_tree = OldTree
+ } = FDI,
+ {NewRevTree, _} = couch_key_tree:mapfold(fun(Rev, Node, Type, Acc) ->
+ tree_to_term(Rev, Node, Type, Acc, DocId)
+ end, {Engine, St}, OldTree),
+ FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ % Blank out sizes because we allow storage
+ % engines to handle this with their own
+ % definition until further notice.
+ sizes = #size_info{
+ active = -1,
+ external = -1
+ }
+ }.
+
+
+tree_to_term(_Rev, _Leaf, branch, Acc, _DocId) ->
+ {?REV_MISSING, Acc};
+
+tree_to_term({Pos, RevId}, #leaf{} = Leaf, leaf, {Engine, St}, DocId) ->
+ #leaf{
+ deleted = Deleted,
+ ptr = Ptr
+ } = Leaf,
+
+ Doc0 = #doc{
+ id = DocId,
+ revs = {Pos, [RevId]},
+ deleted = Deleted,
+ body = Ptr
+ },
+
+ Doc1 = Engine:read_doc_body(St, Doc0),
+
+ Body = if not is_binary(Doc1#doc.body) -> Doc1#doc.body; true ->
+ couch_compress:decompress(Doc1#doc.body)
+ end,
+
+ Atts1 = if not is_binary(Doc1#doc.atts) -> Doc1#doc.atts; true ->
+ couch_compress:decompress(Doc1#doc.atts)
+ end,
+
+ StreamSrc = fun(Sp) -> Engine:open_read_stream(St, Sp) end,
+ Atts2 = [couch_att:from_disk_term(StreamSrc, Att) || Att <- Atts1],
+ Atts = [att_to_term(Att) || Att <- Atts2],
+
+ NewLeaf = Leaf#leaf{
+ ptr = Body,
+ sizes = #size_info{active = -1, external = -1},
+ atts = Atts
+ },
+ {NewLeaf, {Engine, St}}.
+
+
+att_to_term(Att) ->
+ Bin = couch_att:to_binary(Att),
+ couch_att:store(data, Bin, Att).
+
+
+term_diff(T1, T2) when is_tuple(T1), is_tuple(T2) ->
+ tuple_diff(tuple_to_list(T1), tuple_to_list(T2));
+
+term_diff(L1, L2) when is_list(L1), is_list(L2) ->
+ list_diff(L1, L2);
+
+term_diff(V1, V2) when V1 == V2 ->
+ nodiff;
+
+term_diff(V1, V2) ->
+ {V1, V2}.
+
+
+tuple_diff([], []) ->
+ nodiff;
+
+tuple_diff([T1 | _], []) ->
+ {longer, T1};
+
+tuple_diff([], [T2 | _]) ->
+ {shorter, T2};
+
+tuple_diff([T1 | R1], [T2 | R2]) ->
+ case term_diff(T1, T2) of
+ nodiff ->
+ tuple_diff(R1, R2);
+ Else ->
+ {T1, Else}
+ end.
+
+
+list_diff([], []) ->
+ nodiff;
+
+list_diff([T1 | _], []) ->
+ {longer, T1};
+
+list_diff([], [T2 | _]) ->
+ {shorter, T2};
+
+list_diff([T1 | R1], [T2 | R2]) ->
+ case term_diff(T1, T2) of
+ nodiff ->
+ list_diff(R1, R2);
+ Else ->
+ {T1, Else}
+ end.
+
+
+compact(Engine, St1, DbPath) ->
+ DbName = filename:basename(DbPath),
+ {ok, St2, Pid} = Engine:start_compaction(St1, DbName, [], self()),
+ Ref = erlang:monitor(process, Pid),
+
+ % Ideally I'd assert that Pid is linked to us
+ % at this point but its technically possible
+ % that it could have finished compacting by
+ % the time we check... Quite the quandry.
+
+ Term = receive
+ {'$gen_cast', {compact_done, Engine, Term0}} ->
+ Term0;
+ {'DOWN', Ref, _, _, Reason} ->
+ erlang:error({compactor_died, Reason})
+ after 10000 ->
+ erlang:error(compactor_timed_out)
+ end,
+
+ {ok, St2, DbName, Pid, Term}.
+
+
+with_config(Config, Fun) ->
+ OldConfig = apply_config(Config),
+ try
+ Fun()
+ after
+ apply_config(OldConfig)
+ end.
+
+
+apply_config([]) ->
+ [];
+
+apply_config([{Section, Key, Value} | Rest]) ->
+ Orig = config:get(Section, Key),
+ case Value of
+ undefined -> config:delete(Section, Key);
+ _ -> config:set(Section, Key, Value)
+ end,
+ [{Section, Key, Orig} | apply_config(Rest)].
diff --git a/src/couch/test/couch_bt_engine_tests.erl b/src/couch/test/couch_bt_engine_tests.erl
new file mode 100644
index 0000000..df200df
--- /dev/null
+++ b/src/couch/test/couch_bt_engine_tests.erl
@@ -0,0 +1,20 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_tests).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+couch_bt_engine_test_()->
+ test_engine_util:create_tests(couch, couch_bt_engine).
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 01/03: Implement pluggable storage engines
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3287-pluggable-storage-engines
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ac1deae6074909080b125ddee1b020814226bc7a
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Feb 5 12:04:20 2016 -0600
Implement pluggable storage engines
This change moves the main work of storage engines to run through the
new couch_db_engine behavior. This allows us to replace the storage
engine with different implementations that can be tailored to specific
work loads and environments.
COUCHDB-3287
---
src/chttpd/src/chttpd_db.erl | 9 +-
src/couch/include/couch_db.hrl | 5 +-
src/couch/src/couch_att.erl | 130 +-
src/couch/src/couch_auth_cache.erl | 9 +-
src/couch/src/couch_bt_engine.erl | 10 +-
src/couch/src/couch_changes.erl | 21 +-
src/couch/src/couch_compaction_daemon.erl | 32 +-
src/couch/src/couch_db.erl | 682 +++++------
src/couch/src/couch_db_engine.erl | 3 -
src/couch/src/couch_db_int.hrl | 66 +-
src/couch/src/couch_db_updater.erl | 1269 +++++---------------
src/couch/src/couch_httpd_db.erl | 8 +-
src/couch/src/couch_httpd_misc_handlers.erl | 13 -
src/couch/src/couch_server.erl | 191 ++-
src/couch/src/couch_stream.erl | 256 ++--
src/couch/src/couch_util.erl | 40 +-
src/couch/src/test_util.erl | 2 +
src/couch/test/couch_db_plugin_tests.erl | 2 +-
src/couch/test/couch_stream_tests.erl | 32 +-
src/couch/test/couchdb_compaction_daemon_tests.erl | 2 +-
src/couch/test/couchdb_views_tests.erl | 43 +-
src/couch_index/src/couch_index_updater.erl | 6 +-
src/couch_mrview/src/couch_mrview.erl | 38 +-
src/couch_mrview/src/couch_mrview_util.erl | 17 +-
.../test/couch_replicator_compact_tests.erl | 4 +-
.../test/couch_replicator_filtered_tests.erl | 4 +-
.../test/couch_replicator_missing_stubs_tests.erl | 4 +-
.../test/couch_replicator_selector_tests.erl | 4 +-
.../test/couch_replicator_test_helper.erl | 4 +-
.../couch_replicator_use_checkpoints_tests.erl | 4 +-
src/fabric/src/fabric_db_create.erl | 29 +-
src/fabric/src/fabric_rpc.erl | 11 +-
src/mem3/include/mem3.hrl | 6 +-
src/mem3/src/mem3.erl | 20 +-
src/mem3/src/mem3_nodes.erl | 3 +-
src/mem3/src/mem3_rep.erl | 13 +-
src/mem3/src/mem3_shards.erl | 61 +-
src/mem3/src/mem3_util.erl | 17 +-
src/mem3/test/mem3_util_test.erl | 16 +-
39 files changed, 1260 insertions(+), 1826 deletions(-)
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 8f143fe..efb2f6f 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -284,8 +284,15 @@ create_db_req(#httpd{}=Req, DbName) ->
N = chttpd:qs_value(Req, "n", config:get("cluster", "n", "3")),
Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")),
P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")),
+ E = iolist_to_binary(chttpd:qs_value(Req, "engine", "couch")),
+ Options = [
+ {n, N},
+ {q, Q},
+ {placement, P},
+ {engine, E}
+ ],
DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
- case fabric:create_db(DbName, [{n,N}, {q,Q}, {placement,P}]) of
+ case fabric:create_db(DbName, Options) of
ok ->
send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]});
accepted ->
diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl
index 5abb316..ca57cce 100644
--- a/src/couch/include/couch_db.hrl
+++ b/src/couch/include/couch_db.hrl
@@ -112,7 +112,10 @@
% the json body object.
body = {[]},
- atts = [] :: [couch_att:att()], % attachments
+ % Atts can be a binary when a storage engine
+ % returns attachment info blob in compressed
+ % form.
+ atts = [] :: [couch_att:att()] | binary(), % attachments
deleted = false,
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index 9d38cfa..e6ed7df 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -18,7 +18,8 @@
fetch/2,
store/2,
store/3,
- transform/3
+ transform/3,
+ copy/2
]).
-export([
@@ -233,6 +234,14 @@ transform(Field, Fun, Att) ->
store(Field, NewValue, Att).
+copy(Att, DstStream) ->
+ [{stream, SrcStream}, AttLen, OldMd5] = fetch([data, att_len, md5], Att),
+ ok = couch_stream:copy(SrcStream, DstStream),
+ {NewStream, AttLen, _, NewMd5, _} = couch_stream:close(DstStream),
+ couch_util:check_md5(OldMd5, NewMd5),
+ store(data, {stream, NewStream}, Att).
+
+
is_stub(Att) ->
stub == fetch(data, Att).
@@ -292,11 +301,12 @@ size_info(Atts) ->
%% as safe as possible, avoiding the need for complicated disk versioning
%% schemes.
to_disk_term(#att{} = Att) ->
- {_, StreamIndex} = fetch(data, Att),
+ {stream, StreamEngine} = fetch(data, Att),
+ {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
{
fetch(name, Att),
fetch(type, Att),
- StreamIndex,
+ Sp,
fetch(att_len, Att),
fetch(disk_len, Att),
fetch(revpos, Att),
@@ -309,9 +319,13 @@ to_disk_term(Att) ->
fun
(data, {Props, Values}) ->
case lists:keytake(data, 1, Props) of
- {value, {_, {_Fd, Sp}}, Other} -> {Other, [Sp | Values]};
- {value, {_, Value}, Other} -> {Other, [Value | Values]};
- false -> {Props, [undefined |Values ]}
+ {value, {_, {stream, StreamEngine}}, Other} ->
+ {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
+ {Other, [Sp | Values]};
+ {value, {_, Value}, Other} ->
+ {Other, [Value | Values]};
+ false ->
+ {Props, [undefined |Values ]}
end;
(Key, {Props, Values}) ->
case lists:keytake(Key, 1, Props) of
@@ -332,9 +346,11 @@ to_disk_term(Att) ->
%% compression to remove these sorts of common bits (block level compression
%% with something like a shared dictionary that is checkpointed every now and
%% then).
-from_disk_term(Fd, {Base, Extended}) when is_tuple(Base), is_list(Extended) ->
- store(Extended, from_disk_term(Fd, Base));
-from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+from_disk_term(StreamSrc, {Base, Extended})
+ when is_tuple(Base), is_list(Extended) ->
+ store(Extended, from_disk_term(StreamSrc, Base));
+from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -342,10 +358,11 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
disk_len=DiskLen,
md5=Md5,
revpos=RevPos,
- data={Fd,Sp},
+ data={stream, Stream},
encoding=upgrade_encoding(Enc)
};
-from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
+from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -353,9 +370,10 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
disk_len=AttLen,
md5=Md5,
revpos=RevPos,
- data={Fd,Sp}
+ data={stream, Stream}
};
-from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) ->
+from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -363,7 +381,7 @@ from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) ->
disk_len=AttLen,
md5= <<>>,
revpos=0,
- data={Fd,Sp}
+ data={stream, Stream}
}.
@@ -477,32 +495,18 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
{Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}.
-flush(Fd, Att) ->
- flush_data(Fd, fetch(data, Att), Att).
+flush(Db, Att) ->
+ flush_data(Db, fetch(data, Att), Att).
-flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd ->
- % already written to our file, nothing to write
- Att;
-flush_data(Fd, {OtherFd, StreamPointer}, Att) ->
- [InMd5, InDiskLen] = fetch([md5, disk_len], Att),
- {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
- couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- couch_db:check_md5(IdentityMd5, InMd5),
- store([
- {data, {Fd, NewStreamData}},
- {md5, Md5},
- {att_len, Len},
- {disk_len, InDiskLen}
- ], Att);
-flush_data(Fd, Data, Att) when is_binary(Data) ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+flush_data(Db, Data, Att) when is_binary(Data) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
couch_stream:write(OutputStream, Data)
end);
-flush_data(Fd, Fun, Att) when is_function(Fun) ->
+flush_data(Db, Fun, Att) when is_function(Fun) ->
case fetch(att_len, Att) of
undefined ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
% Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
Fun(4096,
@@ -523,11 +527,11 @@ flush_data(Fd, Fun, Att) when is_function(Fun) ->
end, ok)
end);
AttLen ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, AttLen)
end)
end;
-flush_data(Fd, {follows, Parser, Ref}, Att) ->
+flush_data(Db, {follows, Parser, Ref}, Att) ->
ParserRef = erlang:monitor(process, Parser),
Fun = fun() ->
Parser ! {get_bytes, Ref, self()},
@@ -541,9 +545,23 @@ flush_data(Fd, {follows, Parser, Ref}, Att) ->
end
end,
try
- flush_data(Fd, Fun, store(data, Fun, Att))
+ flush_data(Db, Fun, store(data, Fun, Att))
after
erlang:demonitor(ParserRef, [flush])
+ end;
+flush_data(Db, {stream, StreamEngine}, Att) ->
+ case couch_db:is_active_stream(Db, StreamEngine) of
+ true ->
+ % Already written
+ Att;
+ false ->
+ NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) ->
+ couch_stream:copy(StreamEngine, OutputStream)
+ end),
+ InMd5 = fetch(md5, Att),
+ OutMd5 = fetch(md5, NewAtt),
+ couch_util:check_md5(OutMd5, InMd5),
+ NewAtt
end.
@@ -572,9 +590,9 @@ foldl(Att, Fun, Acc) ->
foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-foldl({Fd, Sp}, Att, Fun, Acc) ->
+foldl({stream, StreamEngine}, Att, Fun, Acc) ->
Md5 = fetch(md5, Att),
- couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+ couch_stream:foldl(StreamEngine, Md5, Fun, Acc);
foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) ->
Len = fetch(att_len, Att),
fold_streamed_data(DataFun, Len, Fun, Acc);
@@ -599,14 +617,15 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
range_foldl(Att, From, To, Fun, Acc) ->
- {Fd, Sp} = fetch(data, Att),
- couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
+ {stream, StreamEngine} = fetch(data, Att),
+ couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc).
foldl_decode(Att, Fun, Acc) ->
case fetch([data, encoding], Att) of
- [{Fd, Sp}, Enc] ->
- couch_stream:foldl_decode(Fd, Sp, fetch(md5, Att), Enc, Fun, Acc);
+ [{stream, StreamEngine}, Enc] ->
+ couch_stream:foldl_decode(
+ StreamEngine, fetch(md5, Att), Enc, Fun, Acc);
[Fun2, identity] ->
fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc)
end.
@@ -620,7 +639,7 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
Bin;
to_binary(Iolist, _Att) when is_list(Iolist) ->
iolist_to_binary(Iolist);
-to_binary({_Fd,_Sp}, Att) ->
+to_binary({stream, _StreamEngine}, Att) ->
iolist_to_binary(
lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, []))
);
@@ -680,9 +699,25 @@ upgrade_encoding(false) -> identity;
upgrade_encoding(Encoding) -> Encoding.
+open_stream(StreamSrc, Data) ->
+ case couch_db:is_db(StreamSrc) of
+ true ->
+ couch_db:open_read_stream(StreamSrc, Data);
+ false ->
+ case is_function(StreamSrc, 1) of
+ true ->
+ StreamSrc(Data);
+ false ->
+ erlang:error({invalid_stream_source, StreamSrc})
+ end
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
+% Eww...
+-include("couch_bt_engine.hrl").
%% Test utilities
@@ -737,7 +772,7 @@ attachment_disk_term_test_() ->
{disk_len, 0},
{md5, <<212,29,140,217,143,0,178,4,233,128,9,152,236,248,66,126>>},
{revpos, 4},
- {data, {fake_fd, fake_sp}},
+ {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}},
{encoding, identity}
]),
BaseDiskTerm = {
@@ -751,11 +786,12 @@ attachment_disk_term_test_() ->
Headers = [{<<"X-Foo">>, <<"bar">>}],
ExtendedAttachment = store(headers, Headers, BaseAttachment),
ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]},
+ FakeDb = test_util:fake_db([{engine, {couch_bt_engine, #st{fd=fake_fd}}}]),
{"Disk term tests", [
?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)),
- ?_assertEqual(BaseAttachment, from_disk_term(fake_fd, BaseDiskTerm)),
+ ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)),
?_assertEqual(ExtendedDiskTerm, to_disk_term(ExtendedAttachment)),
- ?_assertEqual(ExtendedAttachment, from_disk_term(fake_fd, ExtendedDiskTerm))
+ ?_assertEqual(ExtendedAttachment, from_disk_term(FakeDb, ExtendedDiskTerm))
]}.
diff --git a/src/couch/src/couch_auth_cache.erl b/src/couch/src/couch_auth_cache.erl
index 16c59d1..d4c5762 100644
--- a/src/couch/src/couch_auth_cache.erl
+++ b/src/couch/src/couch_auth_cache.erl
@@ -326,13 +326,8 @@ refresh_entries(AuthDb) ->
AuthDb2Seq = couch_db:get_update_seq(AuthDb2),
case AuthDb2Seq > AuthDbSeq of
true ->
- {ok, _, _} = couch_db:enum_docs_since(
- AuthDb2,
- AuthDbSeq,
- fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end,
- AuthDbSeq,
- []
- ),
+ Fun = fun(DocInfo, _) -> refresh_entry(AuthDb2, DocInfo) end,
+ {ok, _} = couch_db:fold_changes(AuthDb2, AuthDbSeq, Fun, nil),
true = ets:insert(?STATE, {auth_db, AuthDb2});
false ->
ok
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index fef740b..374199f 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -445,11 +445,11 @@ is_active_stream(_, _) ->
fold_docs(St, UserFun, UserAcc, Options) ->
- fold_docs_int(St#st.id_tree, UserFun, UserAcc, Options).
+ fold_docs_int(St, St#st.id_tree, UserFun, UserAcc, Options).
fold_local_docs(St, UserFun, UserAcc, Options) ->
- fold_docs_int(St#st.local_tree, UserFun, UserAcc, Options).
+ fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options).
fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
@@ -838,7 +838,7 @@ active_size(#st{} = St, #size_info{} = SI) ->
end, SI#size_info.active, Trees).
-fold_docs_int(Tree, UserFun, UserAcc, Options) ->
+fold_docs_int(St, Tree, UserFun, UserAcc, Options) ->
Fun = case lists:member(include_deleted, Options) of
true -> fun include_deleted/4;
false -> fun skip_deleted/4
@@ -851,8 +851,10 @@ fold_docs_int(Tree, UserFun, UserAcc, Options) ->
{ok, Reds, OutAcc} = couch_btree:fold(Tree, Fun, InAcc, Options),
{_, {_, FinalUserAcc}} = OutAcc,
case lists:member(include_reductions, Options) of
- true ->
+ true when Tree == St#st.id_tree ->
{ok, fold_docs_reduce_to_count(Reds), FinalUserAcc};
+ true when Tree == St#st.local_tree ->
+ {ok, 0, FinalUserAcc};
false ->
{ok, FinalUserAcc}
end.
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index ea7f65c..7dfefed 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -536,7 +536,8 @@ send_changes(Acc, Dir, FirstRound) ->
{#mrview{}, {fast_view, _, _, _}} ->
couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
{undefined, _} ->
- couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ Opts = [{dir, Dir}],
+ couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
{#mrview{}, _} ->
ViewEnumFun = fun view_changes_enumerator/2,
{Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
@@ -571,18 +572,22 @@ can_optimize(_, _) ->
send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
- Lookups = couch_db:get_full_doc_infos(Db, DocIds),
+ Results = couch_db:get_full_doc_infos(Db, DocIds),
FullInfos = lists:foldl(fun
- ({ok, FDI}, Acc) -> [FDI | Acc];
+ (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
(not_found, Acc) -> Acc
- end, [], Lookups),
+ end, [], Results),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts),
+ Opts = [
+ include_deleted,
+ {start_key, <<"_design/">>},
+ {end_key_gt, <<"_design0">>}
+ ],
+ {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
@@ -757,6 +762,8 @@ changes_enumerator(Value0, Acc) ->
end,
Results = [Result || Result <- Results0, Result /= null],
Seq = case Value of
+ #full_doc_info{} ->
+ Value#full_doc_info.update_seq;
#doc_info{} ->
Value#doc_info.high_seq;
{{Seq0, _}, _} ->
@@ -816,6 +823,8 @@ view_changes_row(Results, KVs, Acc) ->
] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+changes_row(Results, #full_doc_info{} = FDI, Acc) ->
+ changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
changes_row(Results, DocInfo, Acc) ->
#doc_info{
id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
diff --git a/src/couch/src/couch_compaction_daemon.erl b/src/couch/src/couch_compaction_daemon.erl
index f3b646d..da7481c 100644
--- a/src/couch/src/couch_compaction_daemon.erl
+++ b/src/couch/src/couch_compaction_daemon.erl
@@ -236,17 +236,18 @@ maybe_compact_views(DbName, [DDocName | Rest], Config) ->
db_ddoc_names(Db) ->
- {ok, _, DDocNames} = couch_db:enum_docs(
- Db,
- fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
- {ok, Acc};
- (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
- {ok, [Id | Acc]};
- (_, _, Acc) ->
- {stop, Acc}
- end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
+ FoldFun = fun ddoc_name/2,
+ Opts = [{start_key, <<"_design/">>}],
+ {ok, DDocNames} = couch_db:fold_docs(Db, FoldFun, [], Opts),
DDocNames.
+ddoc_name(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, Acc) ->
+ {ok, Acc};
+ddoc_name(#full_doc_info{id = <<"_design/", Id/binary>>}, Acc) ->
+ {ok, [Id | Acc]};
+ddoc_name(_, Acc) ->
+ {stop, Acc}.
+
maybe_compact_view(DbName, GroupId, Config) ->
DDocId = <<"_design/", GroupId/binary>>,
@@ -391,21 +392,22 @@ check_frag(Threshold, Frag) ->
frag(Props) ->
- FileSize = couch_util:get_value(disk_size, Props),
+ {Sizes} = couch_util:get_value(sizes, Props),
+ FileSize = couch_util:get_value(file, Sizes),
MinFileSize = list_to_integer(
config:get("compaction_daemon", "min_file_size", "131072")),
case FileSize < MinFileSize of
true ->
{0, FileSize};
false ->
- case couch_util:get_value(data_size, Props) of
- null ->
- {100, FileSize};
+ case couch_util:get_value(active, Sizes) of
0 ->
{0, FileSize};
- DataSize ->
+ DataSize when is_integer(DataSize), DataSize > 0 ->
Frag = round(((FileSize - DataSize) / FileSize * 100)),
- {Frag, space_required(DataSize)}
+ {Frag, space_required(DataSize)};
+ _ ->
+ {100, FileSize}
end
end.
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index ddf211e..7fc22b2 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -16,12 +16,10 @@
create/2,
open/2,
open_int/2,
+ incref/1,
reopen/1,
close/1,
- incref/1,
- decref/1,
-
clustered_db/2,
clustered_db/3,
@@ -34,13 +32,13 @@
check_is_member/1,
name/1,
- compression/1,
get_after_doc_read_fun/1,
get_before_doc_update_fun/1,
get_committed_update_seq/1,
get_compacted_seq/1,
get_compactor_pid/1,
get_db_info/1,
+ get_del_doc_count/1,
get_doc_count/1,
get_epochs/1,
get_filepath/1,
@@ -58,7 +56,6 @@
is_system_db/1,
is_clustered/1,
- increment_update_seq/1,
set_revs_limit/2,
set_security/2,
set_user_ctx/2,
@@ -67,12 +64,12 @@
ensure_full_commit/2,
load_validation_funs/1,
+ reload_validation_funs/1,
open_doc/2,
open_doc/3,
open_doc_revs/4,
open_doc_int/3,
- read_doc/2,
get_doc_info/2,
get_full_doc_info/2,
get_full_doc_infos/2,
@@ -89,16 +86,16 @@
purge_docs/2,
with_stream/3,
+ open_write_stream/2,
+ open_read_stream/2,
+ is_active_stream/2,
+ fold_docs/3,
fold_docs/4,
fold_local_docs/4,
- enum_docs/4,
- enum_docs_reduce_to_count/1,
-
- enum_docs_since/5,
- enum_docs_since_reduce_to_count/1,
- changes_since/4,
- changes_since/5,
+ fold_design_docs/4,
+ fold_changes/4,
+ fold_changes/5,
count_changes_since/2,
calculate_start_seq/3,
@@ -113,14 +110,13 @@
normalize_dbname/1,
validate_dbname/1,
- check_md5/2,
make_doc/5,
new_revid/1
]).
-export([
- start_link/3
+ start_link/4
]).
@@ -132,38 +128,9 @@
"(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
).
-start_link(DbName, Filepath, Options) ->
- case open_db_file(Filepath, Options) of
- {ok, Fd} ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
- Filepath, Fd, Options}, []),
- unlink(Fd),
- gen_server:call(UpdaterPid, get_db);
- Else ->
- Else
- end.
-
-open_db_file(Filepath, Options) ->
- case couch_file:open(Filepath, Options) of
- {ok, Fd} ->
- {ok, Fd};
- {error, enoent} ->
- % couldn't find file. is there a compact version? This can happen if
- % crashed during the file switch.
- case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
- {ok, Fd} ->
- couch_log:info("Found ~s~s compaction file, using as primary"
- " storage.", [Filepath, ".compact"]),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
- {ok, Fd};
- {error, enoent} ->
- {not_found, no_db_file}
- end;
- Error ->
- Error
- end.
-
+start_link(Engine, DbName, Filepath, Options) ->
+ Arg = {Engine, DbName, Filepath, Options},
+ proc_lib:start_link(couch_db_updater, init, [Arg]).
create(DbName, Options) ->
couch_server:create(DbName, Options).
@@ -189,16 +156,20 @@ open(DbName, Options) ->
Else -> Else
end.
-reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
- {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
- case NewFd =:= Fd of
- true ->
- {ok, NewDb#db{user_ctx = UserCtx}};
- false ->
- erlang:demonitor(OldRef, [flush]),
- NewRef = erlang:monitor(process, NewFd),
- {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
- end.
+
+reopen(#db{} = Db) ->
+ % We could have just swapped out the storage engine
+ % for this database during a compaction so we just
+ % reimplement this as a close/open pair now.
+ close(Db),
+ open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]).
+
+
+% You shouldn't call this. Its part of the ref counting between
+% couch_server and couch_db instances.
+incref(#db{} = Db) ->
+ couch_db_engine:incref(Db).
+
clustered_db(DbName, UserCtx) ->
clustered_db(DbName, UserCtx, []).
@@ -206,14 +177,6 @@ clustered_db(DbName, UserCtx) ->
clustered_db(DbName, UserCtx, SecProps) ->
{ok, #db{name = DbName, user_ctx = UserCtx, security = SecProps}}.
-incref(#db{fd = Fd} = Db) ->
- Ref = erlang:monitor(process, Fd),
- {ok, Db#db{fd_monitor = Ref}}.
-
-decref(#db{fd_monitor = Monitor}) ->
- erlang:demonitor(Monitor, [flush]),
- ok.
-
is_db(#db{}) ->
true;
is_db(_) ->
@@ -226,8 +189,8 @@ is_clustered(#db{main_pid = nil}) ->
true;
is_clustered(#db{}) ->
false;
-is_clustered(?NEW_PSE_DB = Db) ->
- ?PSE_DB_MAIN_PID(Db) == undefined.
+is_clustered(?OLD_DB_REC = Db) ->
+ ?OLD_DB_MAIN_PID(Db) == undefined.
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
@@ -238,10 +201,9 @@ ensure_full_commit(Db, RequiredSeq) ->
ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
-close(#db{fd_monitor=Ref}) ->
- erlang:demonitor(Ref, [flush]),
- ok;
-close(?NEW_PSE_DB) ->
+close(#db{} = Db) ->
+ ok = couch_db_engine:decref(Db);
+close(?OLD_DB_REC) ->
ok.
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
@@ -250,20 +212,31 @@ is_idle(_Db) ->
false.
monitored_by(Db) ->
- case erlang:process_info(Db#db.fd, monitored_by) of
- undefined ->
- [];
- {monitored_by, Pids} ->
- PidTracker = whereis(couch_stats_process_tracker),
- Pids -- [Db#db.main_pid, PidTracker]
+ case couch_db_engine:monitored_by(Db) of
+ Pids when is_list(Pids) ->
+ PidTracker = whereis(couch_stats_process_tracker),
+ Pids -- [Db#db.main_pid, PidTracker];
+ undefined ->
+ []
end.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-start_compact(#db{main_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
+start_compact(#db{} = Db) ->
+ start_compact(Db, []).
+
+start_compact(#db{} = Db, Opts) ->
+ case lists:keyfind(notify, 1, Opts) of
+ {notify, Pid, Term} ->
+ % We fake a gen_server call here which sends the
+ % response back to the specified pid.
+ Db#db.main_pid ! {'$gen_call', {Pid, Term}, start_compact},
+ ok;
+ _ ->
+ gen_server:call(Db#db.main_pid, start_compact)
+ end.
cancel_compact(#db{main_pid=Pid}) ->
gen_server:call(Pid, cancel_compact).
@@ -363,7 +336,8 @@ get_missing_revs(Db, IdRevsList) ->
find_missing([], []) ->
[];
-find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo])
+ when is_record(FullInfo, full_doc_info) ->
case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
[] ->
find_missing(RestIdRevs, RestLookupInfo);
@@ -391,8 +365,8 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
- {ok, DocInfo} ->
- {ok, couch_doc:to_doc_info(DocInfo)};
+ #full_doc_info{} = FDI ->
+ {ok, couch_doc:to_doc_info(FDI)};
Else ->
Else
end.
@@ -403,10 +377,7 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.id_tree, Ids).
-
-increment_update_seq(#db{main_pid=Pid}) ->
- gen_server:call(Pid, increment_update_seq).
+ couch_db_engine:open_docs(Db, Ids).
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
@@ -420,37 +391,34 @@ get_before_doc_update_fun(#db{before_doc_update = Fun}) ->
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
-get_update_seq(#db{update_seq=Seq})->
- Seq.
+get_update_seq(#db{} = Db)->
+ couch_db_engine:get_update_seq(Db).
get_user_ctx(#db{user_ctx = UserCtx}) ->
UserCtx;
-get_user_ctx(?NEW_PSE_DB = Db) ->
- ?PSE_DB_USER_CTX(Db).
+get_user_ctx(?OLD_DB_REC = Db) ->
+ ?OLD_DB_USER_CTX(Db).
get_purge_seq(#db{}=Db) ->
- couch_db_header:purge_seq(Db#db.header).
+ {ok, couch_db_engine:get_purge_seq(Db)}.
get_last_purged(#db{}=Db) ->
- case couch_db_header:purged_docs(Db#db.header) of
- nil ->
- {ok, []};
- Pointer ->
- couch_file:pread_term(Db#db.fd, Pointer)
- end.
+ {ok, couch_db_engine:get_last_purged(Db)}.
get_pid(#db{main_pid = Pid}) ->
Pid.
+get_del_doc_count(Db) ->
+ {ok, couch_db_engine:get_del_doc_count(Db)}.
+
get_doc_count(Db) ->
- {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
- {ok, Count}.
+ {ok, couch_db_engine:get_doc_count(Db)}.
get_uuid(#db{}=Db) ->
- couch_db_header:uuid(Db#db.header).
+ couch_db_engine:get_uuid(Db).
get_epochs(#db{}=Db) ->
- Epochs = couch_db_header:epochs(Db#db.header),
+ Epochs = couch_db_engine:get_epochs(Db),
validate_epochs(Epochs),
Epochs.
@@ -461,34 +429,25 @@ get_instance_start_time(#db{instance_start_time = IST}) ->
IST.
get_compacted_seq(#db{}=Db) ->
- couch_db_header:compacted_seq(Db#db.header).
+ couch_db_engine:get_compacted_seq(Db).
get_compactor_pid(#db{compactor_pid = Pid}) ->
Pid.
get_db_info(Db) ->
- #db{fd=Fd,
- header=Header,
- compactor_pid=Compactor,
- update_seq=SeqNum,
- name=Name,
- instance_start_time=StartTime,
- committed_update_seq=CommittedUpdateSeq,
- id_tree = IdBtree
+ #db{
+ name = Name,
+ compactor_pid = Compactor,
+ instance_start_time = StartTime,
+ committed_update_seq = CommittedUpdateSeq
} = Db,
- {ok, FileSize} = couch_file:bytes(Fd),
- {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
- SizeInfo0 = element(3, DbReduction),
- SizeInfo = case SizeInfo0 of
- SI when is_record(SI, size_info) ->
- SI;
- {AS, ES} ->
- #size_info{active=AS, external=ES};
- AS ->
- #size_info{active=AS}
- end,
- ActiveSize = active_size(Db, SizeInfo),
- DiskVersion = couch_db_header:disk_version(Header),
+ {ok, DocCount} = get_doc_count(Db),
+ {ok, DelDocCount} = get_del_doc_count(Db),
+ SizeInfo = couch_db_engine:get_size_info(Db),
+ FileSize = couch_util:get_value(file, SizeInfo, null),
+ ActiveSize = couch_util:get_value(active, SizeInfo, null),
+ ExternalSize = couch_util:get_value(external, SizeInfo, null),
+ DiskVersion = couch_db_engine:get_disk_version(Db),
Uuid = case get_uuid(Db) of
undefined -> null;
Uuid0 -> Uuid0
@@ -499,63 +458,38 @@ get_db_info(Db) ->
end,
InfoList = [
{db_name, Name},
- {doc_count, element(1, DbReduction)},
- {doc_del_count, element(2, DbReduction)},
- {update_seq, SeqNum},
- {purge_seq, couch_db:get_purge_seq(Db)},
- {compact_running, Compactor/=nil},
+ {engine, couch_db_engine:get_engine(Db)},
+ {doc_count, DocCount},
+ {doc_del_count, DelDocCount},
+ {update_seq, get_update_seq(Db)},
+ {purge_seq, couch_db_engine:get_purge_seq(Db)},
+ {compact_running, Compactor /= nil},
+ {sizes, {SizeInfo}},
+ % TODO: Remove this in 3.0
+ % These are legacy and have been duplicated under
+ % the sizes key since 2.0. We should make a note
+ % in our release notes that we'll remove these
+ % old versions in 3.0
{disk_size, FileSize}, % legacy
- {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy
- {data_size, ActiveSize}, % legacy
- {sizes, {[
- {file, FileSize},
- {active, ActiveSize},
- {external, SizeInfo#size_info.external}
- ]}},
+ {data_size, ActiveSize},
+ {other, {[{data_size, ExternalSize}]}},
{instance_start_time, StartTime},
{disk_format_version, DiskVersion},
{committed_update_seq, CommittedUpdateSeq},
{compacted_seq, CompactedSeq},
{uuid, Uuid}
- ],
- {ok, InfoList}.
-
-active_size(#db{}=Db, Size) when is_integer(Size) ->
- active_size(Db, #size_info{active=Size});
-active_size(#db{}=Db, #size_info{}=SI) ->
- Trees = [
- Db#db.id_tree,
- Db#db.seq_tree,
- Db#db.local_tree
],
- lists:foldl(fun(T, Acc) ->
- case couch_btree:size(T) of
- _ when Acc == null ->
- null;
- nil ->
- null;
- Size ->
- Acc + Size
- end
- end, SI#size_info.active, Trees).
+ {ok, InfoList}.
get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
end;
-get_design_docs(#db{id_tree = IdBtree}) ->
- FoldFun = pipe([fun skip_deleted/4], fun
- (#full_doc_info{deleted = true}, _Reds, Acc) ->
- {ok, Acc};
- (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
- {ok, [FullDocInfo | Acc]};
- (_, _Reds, Acc) ->
- {stop, Acc}
- end),
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
- {ok, Docs}.
+get_design_docs(#db{} = Db) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
+ {ok, lists:reverse(Docs)}.
check_is_admin(#db{user_ctx=UserCtx}=Db) ->
@@ -645,8 +579,8 @@ get_members(#db{security=SecProps}) ->
get_security(#db{security=SecProps}) ->
{SecProps};
-get_security(?NEW_PSE_DB = Db) ->
- {?PSE_DB_SECURITY(Db)}.
+get_security(?OLD_DB_REC = Db) ->
+ {?OLD_DB_SECURITY(Db)}.
set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
@@ -685,8 +619,8 @@ validate_names_and_roles({Props}) when is_list(Props) ->
end,
ok.
-get_revs_limit(#db{revs_limit=Limit}) ->
- Limit.
+get_revs_limit(#db{} = Db) ->
+ couch_db_engine:get_revs_limit(Db).
set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
@@ -694,13 +628,11 @@ set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
set_revs_limit(_Db, _Limit) ->
throw(invalid_revs_limit).
+
name(#db{name=Name}) ->
Name;
-name(?NEW_PSE_DB = Db) ->
- ?PSE_DB_NAME(Db).
-
-compression(#db{compression=Compression}) ->
- Compression.
+name(?OLD_DB_REC = Db) ->
+ ?OLD_DB_NAME(Db).
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
@@ -831,6 +763,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) ->
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs.
+reload_validation_funs(#db{} = Db) ->
+ gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}).
+
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
case Revs of
@@ -897,7 +832,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
- [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([
@@ -948,13 +883,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
- {ok, #full_doc_info{rev_tree=OldTree}} ->
+ #full_doc_info{rev_tree=OldTree} ->
+ RevsLimit = get_revs_limit(Db),
OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc), Db#db.revs_limit),
+ couch_doc:to_path(NewDoc), RevsLimit),
NewTree
end,
OldTree, Bucket),
@@ -1090,7 +1026,7 @@ update_docs(Db, Docs0, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc))
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -1144,8 +1080,8 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
+ doc_flush_atts(Db, set_new_att_revpos(
+ check_dup_atts(Doc)))
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -1229,7 +1165,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [
- [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
+ [doc_flush_atts(Db2, Doc) || Doc <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
@@ -1248,18 +1184,24 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
prepare_doc_summaries(Db, BucketList) ->
[lists:map(
- fun(#doc{body = Body, atts = Atts} = Doc) ->
+ fun(#doc{atts = Atts} = Doc0) ->
DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts],
{ok, SizeInfo} = couch_att:size_info(Atts),
- AttsFd = case Atts of
- [Att | _] ->
- {Fd, _} = couch_att:fetch(data, Att),
- Fd;
- [] ->
- nil
+ AttsStream = case Atts of
+ [Att | _] ->
+ {stream, StreamEngine} = couch_att:fetch(data, Att),
+ StreamEngine;
+ [] ->
+ nil
end,
- SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
- Doc#doc{body = {summary, SummaryChunk, SizeInfo, AttsFd}}
+ Doc1 = Doc0#doc{
+ atts = DiskAtts,
+ meta = [
+ {size_info, SizeInfo},
+ {atts_stream, AttsStream}
+ ] ++ Doc0#doc.meta
+ },
+ couch_db_engine:serialize_doc(Db, Doc1)
end,
Bucket) || Bucket <- BucketList].
@@ -1284,12 +1226,8 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) ->
Doc#doc{atts = Atts}.
-doc_flush_atts(Doc, Fd) ->
- Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}.
-
-check_md5(_NewSig, <<>>) -> ok;
-check_md5(Sig, Sig) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
+doc_flush_atts(Db, Doc) ->
+ Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}.
compressible_att_type(MimeType) when is_binary(MimeType) ->
@@ -1319,21 +1257,24 @@ compressible_att_type(MimeType) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, Att, Fun) ->
+with_stream(Db, Att, Fun) ->
[InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att),
BufferSize = list_to_integer(
config:get("couchdb", "attachment_stream_buffer_size", "4096")),
- {ok, OutputStream} = case (Enc =:= identity) andalso
- compressible_att_type(Type) of
- true ->
- CompLevel = list_to_integer(
- config:get("attachments", "compression_level", "0")
- ),
- couch_stream:open(Fd, [{buffer_size, BufferSize},
- {encoding, gzip}, {compression_level, CompLevel}]);
- _ ->
- couch_stream:open(Fd, [{buffer_size, BufferSize}])
+ Options = case (Enc =:= identity) andalso compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ config:get("attachments", "compression_level", "0")
+ ),
+ [
+ {buffer_size, BufferSize},
+ {encoding, gzip},
+ {compression_level, CompLevel}
+ ];
+ _ ->
+ [{buffer_size, BufferSize}]
end,
+ {ok, OutputStream} = open_write_stream(Db, Options),
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
@@ -1343,9 +1284,9 @@ with_stream(Fd, Att, Fun) ->
_ ->
InMd5
end,
- {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
- check_md5(IdentityMd5, ReqMd5),
+ couch_util:check_md5(IdentityMd5, ReqMd5),
{AttLen, DiskLen, NewEnc} = case Enc of
identity ->
case {Md5, IdentityMd5} of
@@ -1367,7 +1308,7 @@ with_stream(Fd, Att, Fun) ->
end
end,
couch_att:store([
- {data, {Fd,StreamInfo}},
+ {data, {stream, StreamEngine}},
{att_len, AttLen},
{disk_len, DiskLen},
{md5, Md5},
@@ -1375,83 +1316,16 @@ with_stream(Fd, Att, Fun) ->
], Att).
-enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(
- fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+open_write_stream(Db, Options) ->
+ couch_db_engine:open_write_stream(Db, Options).
-enum_docs_reduce_to_count(Reds) ->
- FinalRed = couch_btree:final_reduce(
- fun couch_db_updater:btree_by_id_reduce/2, Reds),
- element(1, FinalRed).
-changes_since(Db, StartSeq, Fun, Acc) ->
- changes_since(Db, StartSeq, Fun, [], Acc).
+open_read_stream(Db, AttState) ->
+ couch_db_engine:open_read_stream(Db, AttState).
-changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
- changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc);
-changes_since(SeqTree, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
- DocInfo = case FullDocInfo of
- #full_doc_info{} ->
- couch_doc:to_doc_info(FullDocInfo);
- #doc_info{} ->
- FullDocInfo
- end,
- Fun(DocInfo, Acc2)
- end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
- {ok, AccOut}.
-count_changes_since(Db, SinceSeq) ->
- BTree = Db#db.seq_tree,
- {ok, Changes} =
- couch_btree:fold_reduce(BTree,
- fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(BTree, PartialReds)}
- end,
- 0, [{start_key, SinceSeq + 1}]),
- Changes.
-
-enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(
- Db#db.seq_tree, InFun, Acc,
- [{start_key, SinceSeq + 1} | Options]),
- {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
-
-
-fold_docs(Db, InFun, InAcc, Opts) ->
- Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
- {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts),
- {ok, AccOut}.
-
-fold_local_docs(Db, InFun, InAcc, Opts) ->
- Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
- {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts),
- {ok, AccOut}.
-
-enum_docs(Db, InFun, InAcc, Options0) ->
- {NS, Options} = extract_namespace(Options0),
- enum_docs(Db, NS, InFun, InAcc, Options).
-
-enum_docs(Db, undefined, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc};
-enum_docs(Db, <<"_local">>, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, _LastReduce, OutAcc} = couch_btree:fold(
- Db#db.local_tree, FoldFun, InAcc, Options),
- {ok, 0, OutAcc};
-enum_docs(Db, NS, InFun, InAcc, Options0) ->
- FoldFun = pipe([
- fun skip_deleted/4,
- stop_on_leaving_namespace(NS)], InFun),
- Options = set_namespace_range(Options0, NS),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+is_active_stream(Db, StreamEngine) ->
+ couch_db_engine:is_active_stream(Db, StreamEngine).
calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
@@ -1525,22 +1399,43 @@ start_seq([], OrigNode, Seq) ->
erlang:error({epoch_mismatch, OrigNode, Seq}).
-extract_namespace(Options0) ->
- case proplists:split(Options0, [namespace]) of
- {[[{namespace, NS}]], Options} ->
- {NS, Options};
- {_, Options} ->
- {undefined, Options}
- end.
+fold_docs(Db, UserFun, UserAcc) ->
+ fold_docs(Db, UserFun, UserAcc, []).
+
+fold_docs(Db, UserFun, UserAcc, Options) ->
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options).
+
+
+fold_local_docs(Db, UserFun, UserAcc, Options) ->
+ couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options).
+
+
+fold_design_docs(Db, UserFun, UserAcc, Options1) ->
+ Options2 = set_design_doc_keys(Options1),
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc) ->
+ fold_changes(Db, StartSeq, UserFun, UserAcc, []).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
+ couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
+
+
+count_changes_since(Db, SinceSeq) ->
+ couch_db_engine:count_changes_since(Db, SinceSeq).
+
%%% Internal function %%%
+
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
- {ok, #full_doc_info{rev_tree=RevTree}} ->
+ #full_doc_info{rev_tree=RevTree} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
@@ -1574,9 +1469,8 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
- case couch_btree:lookup(Db#db.local_tree, [Id]) of
- [{ok, {_, {Rev, BodyData}}}] ->
- Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
+ case couch_db_engine:open_local_docs(Db, [Id]) of
+ [#doc{} = Doc] ->
apply_open_options({ok, Doc}, Options);
[not_found] ->
{not_found, missing}
@@ -1595,7 +1489,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
- {ok, FullDocInfo} ->
+ #full_doc_info{} = FullDocInfo ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
{not_found, missing}
@@ -1641,9 +1535,6 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
-read_doc(#db{fd=Fd}, Pos) ->
- couch_file:pread_term(Fd, Pos).
-
make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
#doc{
@@ -1653,34 +1544,32 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
atts = [],
deleted = Deleted
};
-make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) ->
- {BodyData, Atts0} = case Bp of
- nil ->
- {[], []};
- _ ->
- case read_doc(Db, Bp) of
- {ok, {BodyData0, Atts1}} when is_binary(Atts1) ->
- {BodyData0, couch_compress:decompress(Atts1)};
- {ok, {BodyData0, Atts1}} when is_list(Atts1) ->
- % pre 1.2 format
- {BodyData0, Atts1}
- end
- end,
- Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0],
- Doc = #doc{
+make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
+ RevsLimit = get_revs_limit(Db),
+ Doc0 = couch_db_engine:read_doc_body(Db, #doc{
id = Id,
revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
- body = BodyData,
- atts = Atts,
+ body = Bp,
deleted = Deleted
- },
- after_doc_read(Db, Doc).
+ }),
+ Doc1 = case Doc0#doc.atts of
+ BinAtts when is_binary(BinAtts) ->
+ Doc0#doc{
+ atts = couch_compress:decompress(BinAtts)
+ };
+ ListAtts when is_list(ListAtts) ->
+ Doc0
+ end,
+ after_doc_read(Db, Doc1#doc{
+ atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts]
+ }).
after_doc_read(#db{} = Db, Doc) ->
DocWithBody = couch_doc:with_ejson_body(Doc),
couch_db_plugin:after_doc_read(Db, DocWithBody).
+
increment_stat(#db{options = Options}, Stat) ->
case lists:member(sys_db, Options) of
true ->
@@ -1689,71 +1578,6 @@ increment_stat(#db{options = Options}, Stat) ->
couch_stats:increment_counter(Stat)
end.
-skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 ->
- {skip, LK, Reds, Acc};
-skip_deleted(Case, A, B, C) ->
- {Case, A, B, C}.
-
-stop_on_leaving_namespace(NS) ->
- fun
- (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) ->
- case has_prefix(Key, NS) of
- true ->
- {visit, FullInfo, Reds, Acc};
- false ->
- {stop, FullInfo, Reds, Acc}
- end;
- (Case, KV, Reds, Acc) ->
- {Case, KV, Reds, Acc}
- end.
-
-has_prefix(Bin, Prefix) ->
- S = byte_size(Prefix),
- case Bin of
- <<Prefix:S/binary, "/", _/binary>> ->
- true;
- _Else ->
- false
- end.
-
-pipe(Filters, Final) ->
- Wrap =
- fun
- (visit, KV, Reds, Acc) ->
- Final(KV, Reds, Acc);
- (skip, _KV, _Reds, Acc) ->
- {skip, Acc};
- (stop, _KV, _Reds, Acc) ->
- {stop, Acc};
- (traverse, _, _, Acc) ->
- {ok, Acc}
- end,
- do_pipe(Filters, Wrap).
-
-do_pipe([], Fun) -> Fun;
-do_pipe([Filter|Rest], F0) ->
- F1 = fun(C0, KV0, Reds0, Acc0) ->
- {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0),
- F0(C, KV, Reds, Acc)
- end,
- do_pipe(Rest, F1).
-
-set_namespace_range(Options, undefined) -> Options;
-set_namespace_range(Options, NS) ->
- %% FIXME depending on order we might need to swap keys
- SK = select_gt(
- proplists:get_value(start_key, Options, <<"">>),
- <<NS/binary, "/">>),
- EK = select_lt(
- proplists:get_value(end_key, Options, <<NS/binary, "0">>),
- <<NS/binary, "0">>),
- [{start_key, SK}, {end_key_gt, EK}].
-
-select_gt(V1, V2) when V1 < V2 -> V2;
-select_gt(V1, _V2) -> V1.
-
-select_lt(V1, V2) when V1 > V2 -> V2;
-select_lt(V1, _V2) -> V1.
-spec normalize_dbname(list() | binary()) -> binary().
@@ -1793,6 +1617,70 @@ is_systemdb(DbName) when is_list(DbName) ->
is_systemdb(DbName) when is_binary(DbName) ->
lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES).
+
+set_design_doc_keys(Options1) ->
+ Dir = case lists:keyfind(dir, 1, Options1) of
+ {dir, D0} -> D0;
+ _ -> fwd
+ end,
+ Options2 = set_design_doc_start_key(Options1, Dir),
+ set_design_doc_end_key(Options2, Dir).
+
+
+-define(FIRST_DDOC_KEY, <<"_design/">>).
+-define(LAST_DDOC_KEY, <<"_design0">>).
+
+
+set_design_doc_start_key(Options, fwd) ->
+ Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2});
+set_design_doc_start_key(Options, rev) ->
+ Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2}).
+
+
+set_design_doc_end_key(Options, fwd) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end;
+set_design_doc_end_key(Options, rev) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1863,19 +1751,19 @@ should_fail_validate_dbname(DbName) ->
ok
end)}.
-calculate_start_seq_test() ->
- %% uuid mismatch is always a rewind.
- Hdr1 = couch_db_header:new(),
- Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
- %% uuid matches and seq is owned by node.
- Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
- ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
- %% uuids match but seq is not owned by node.
- Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
- %% return integer if we didn't get a vector.
- ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+%calculate_start_seq_test() ->
+% %% uuid mismatch is always a rewind.
+% Hdr1 = couch_db_header:new(),
+% Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
+% ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
+% %% uuid matches and seq is owned by node.
+% Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+% ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
+% %% uuids match but seq is not owned by node.
+% Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+% ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
+% %% return integer if we didn't get a vector.
+% ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
is_owner_test() ->
?assertNot(is_owner(foo, 1, [])),
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 7718ac5..045e75c 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -563,9 +563,6 @@
{ok, CompactedDbHandle::db_handle(), CompactorPid::pid() | undefined}.
--include("couch_db_int.hrl").
-
-
-export([
exists/2,
delete/4,
diff --git a/src/couch/src/couch_db_int.hrl b/src/couch/src/couch_db_int.hrl
index 0bbb5e0..2b41b73 100644
--- a/src/couch/src/couch_db_int.hrl
+++ b/src/couch/src/couch_db_int.hrl
@@ -10,35 +10,8 @@
% License for the specific language governing permissions and limitations under
% the License.
--record(db, {
- main_pid = nil,
- compactor_pid = nil,
- instance_start_time, % number of microsecs since jan 1 1970 as a binary string
- fd,
- fd_monitor,
- header = couch_db_header:new(),
- committed_update_seq,
- id_tree,
- seq_tree,
- local_tree,
- update_seq,
- name,
- filepath,
- validate_doc_funs = undefined,
- security = [],
- security_ptr = nil,
- user_ctx = #user_ctx{},
- waiting_delayed_commit = nil,
- revs_limit = 1000,
- fsync_options = [],
- options = [],
- compression,
- before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
- after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc
-}).
-
--record(new_pse_db, {
+-record(db, {
name,
filepath,
@@ -65,27 +38,36 @@
}).
--define(NEW_PSE_DB, {
+-define(OLD_DB_REC, {
db,
- _, % Name
- _, % FilePath
- _, % Engine
_, % MainPid
_, % CompactorPid
- _, % CommittedUpdateSeq
_, % InstanceStartTime
- _, % UserCtx
- _, % Security
+ _, % Fd
+ _, % FdMonitor
+ _, % Header
+ _, % CommittedUpdateSeq
+ _, % IdTree
+ _, % SeqTree
+ _, % LocalTree
+ _, % UpdateSeq
+ _, % Name
+ _, % FilePath
_, % ValidateDocFuns
- _, % BeforeDocUpdate
- _, % AfterDocRead
+ _, % Security
+ _, % SecurityPtr
+ _, % UserCtx
_, % WaitingDelayedCommit
+ _, % RevsLimit
+ _, % FsyncOptions
_, % Options
- _ % Compression
+ _, % Compression
+ _, % BeforeDocUpdate
+ _ % AfterDocRead
}).
--define(PSE_DB_NAME(Db), element(2, Db)).
--define(PSE_DB_MAIN_PID(Db), element(5, Db)).
--define(PSE_DB_USER_CTX(Db), element(9, Db)).
--define(PSE_DB_SECURITY(Db), element(10, Db)).
+-define(OLD_DB_NAME(Db), element(2, Db)).
+-define(OLD_DB_MAIN_PID(Db), element(13, Db)).
+-define(OLD_DB_USER_CTX(Db), element(18, Db)).
+-define(OLD_DB_SECURITY(Db), element(16, Db)).
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 8f6fc35..d765458 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -14,69 +14,37 @@
-behaviour(gen_server).
-vsn(1).
--export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]).
--export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]).
--export([make_doc_summary/2]).
+-export([add_sizes/3, upgrade_sizes/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_db_int.hrl").
--record(comp_header, {
- db_header,
- meta_state
-}).
--record(merge_st, {
- id_tree,
- seq_tree,
- curr,
- rem_seqs,
- infos
-}).
-init({DbName, Filepath, Fd, Options}) ->
+init({Engine, DbName, FilePath, Options0}) ->
erlang:put(io_priority, {db_update, DbName}),
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = couch_db_header:new(),
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath ++ ".compact"),
- couch_file:delete(RootDir, Filepath ++ ".compact.data"),
- couch_file:delete(RootDir, Filepath ++ ".compact.meta");
- false ->
- case couch_file:read_header(Fd) of
- {ok, Header} ->
- ok;
- no_valid_header ->
- % create a new header and writes it to the file
- Header = couch_db_header:new(),
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact"),
- file:delete(Filepath ++ ".compact.data"),
- file:delete(Filepath ++ ".compact.meta")
- end
- end,
- Db = init_db(DbName, Filepath, Fd, Header, Options),
- couch_stats_process_tracker:track([couchdb, open_databases]),
- % we don't load validation funs here because the fabric query is liable to
- % race conditions. Instead see couch_db:validate_doc_update, which loads
- % them lazily
- {ok, Db#db{main_pid = self()}}.
-
-
-terminate(_Reason, Db) ->
- % If the reason we died is because our fd disappeared
- % then we don't need to try closing it again.
- if Db#db.fd_monitor == closed -> ok; true ->
- ok = couch_file:close(Db#db.fd)
- end,
+ DefaultSecObj = default_security_object(DbName),
+ Options = [{default_security_object, DefaultSecObj} | Options0],
+ try
+ {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
+ Db = init_db(DbName, FilePath, EngineState, Options),
+ couch_stats_process_tracker:track([couchdb, open_databases]),
+ % we don't load validation funs here because the fabric query is liable to
+ % race conditions. Instead see couch_db:validate_doc_update, which loads
+ % them lazily
+ NewDb = Db#db{main_pid = self()},
+ proc_lib:init_ack({ok, NewDb}),
+ gen_server:enter_loop(?MODULE, [], NewDb)
+ catch
+ throw:InitError ->
+ proc_lib:init_ack(InitError)
+ end.
+
+
+terminate(Reason, Db) ->
couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd),
+ couch_db_engine:terminate(Reason, Db),
ok.
handle_call(get_db, _From, Db) ->
@@ -100,28 +68,21 @@ handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
unlink(Pid),
exit(Pid, kill),
- RootDir = config:get("couchdb", "database_dir", "."),
- ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
+ couch_server:delete_compaction_files(Db#db.name),
Db2 = Db#db{compactor_pid = nil},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
-handle_call(increment_update_seq, _From, Db) ->
- Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_event:notify(Db#db.name, updated),
- {reply, {ok, Db2#db.update_seq}, Db2};
-handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
- {ok, Ptr, _} = couch_file:append_term(
- Db#db.fd, NewSec, [{compression, Comp}]),
- Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
+handle_call({set_security, NewSec}, _From, #db{} = Db) ->
+ {ok, NewDb} = couch_db_engine:set_security(Db, NewSec),
+ NewSecDb = NewDb#db{
+ security = NewSec
+ },
+ ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity),
+ {reply, ok, NewSecDb};
handle_call({set_revs_limit, Limit}, _From, Db) ->
- Db2 = commit_data(Db#db{revs_limit=Limit,
- update_seq=Db#db.update_seq+1}),
+ {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
@@ -129,73 +90,78 @@ handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
- #db{
- fd = Fd,
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- header = Header,
- compression = Comp
- } = Db,
- DocLookups = couch_btree:lookup(DocInfoByIdBTree,
- [Id || {Id, _Revs} <- IdRevs]),
-
- NewDocInfos = lists:zipwith(
- fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+ DocIds = [Id || {Id, _Revs} <- IdRevs],
+ OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
+
+ NewDocInfos = lists:flatmap(fun
+ ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, []=_RemovedRevs} -> % no change
- nil;
- {NewTree, RemovedRevs} ->
- {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+ {_, [] = _RemovedRevs} -> % no change
+ [];
+ {NewTree, RemovedRevs} ->
+ NewFDI = FDI#full_doc_info{rev_tree = NewTree},
+ [{FDI, NewFDI, RemovedRevs}]
end;
- (_, not_found) ->
- nil
+ ({_, not_found}) ->
+ []
+ end, lists:zip(IdRevs, OldDocInfos)),
+
+ InitUpdateSeq = couch_db_engine:get_update_seq(Db),
+ InitAcc = {InitUpdateSeq, [], []},
+ FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
+ #full_doc_info{
+ id = Id,
+ rev_tree = OldTree
+ } = OldFDI,
+ {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
+
+ {NewFDIAcc, NewSeqAcc} = case OldTree of
+ [] ->
+ % If we purged every #leaf{} in the doc record
+ % then we're removing it completely from the
+ % database.
+ FDIAcc;
+ _ ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the update_seq
+ % sequence. Rather than do a bunch of complicated checks
+ % we just re-label every #leaf{} and reinsert it into
+ % the update_seq sequence.
+ {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, InnerSeqAcc) ->
+ {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
+ (_RevId, Value, _Type, InnerSeqAcc) ->
+ {Value, InnerSeqAcc}
+ end, SeqAcc0, OldTree),
+
+ NewFDI = OldFDI#full_doc_info{
+ update_seq = SeqAcc1,
+ rev_tree = NewTree
+ },
+
+ {[NewFDI | FDIAcc], SeqAcc1}
end,
- IdRevs, DocLookups),
-
- SeqsToRemove = [Seq
- || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
-
- FullDocInfoToUpdate = [FullInfo
- || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
- <- NewDocInfos, Tree /= []],
-
- IdRevsPurged = [{Id, Revs}
- || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
-
- {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
- fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
- Tree2 = couch_key_tree:map_leafs(
- fun(_RevId, Leaf) ->
- Leaf#leaf{seq=SeqAcc+1}
- end, Tree),
- {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1}
- end, LastSeq, FullDocInfoToUpdate),
-
- IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
- <- NewDocInfos],
-
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
- DocInfoToUpdate, SeqsToRemove),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
- FullDocInfoToUpdate, IdsToRemove),
- {ok, Pointer, _} = couch_file:append_term(
- Fd, IdRevsPurged, [{compression, Comp}]),
-
- NewHeader = couch_db_header:set(Header, [
- {purge_seq, couch_db_header:purge_seq(Header) + 1},
- {purged_docs, Pointer}
- ]),
- Db2 = commit_data(
- Db#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq + 1,
- header=NewHeader}),
+ NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
+ {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
+ end, InitAcc, NewDocInfos),
+
+ {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
+
+ % We need to only use the list of #full_doc_info{} records
+ % that we have actually changed due to a purge.
+ PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
+ Pairs = pair_purge_info(PreviousFDIs, FDIs),
+
+ {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
- {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}.
+
+ PurgeSeq = couch_db_engine:get_purge_seq(Db2),
+ {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2};
+
+handle_call(Msg, From, Db) ->
+ couch_db_engine:handle_call(Msg, From, Db).
handle_cast({load_validation_funs, ValidationFuns}, Db) ->
@@ -204,65 +170,29 @@ handle_cast({load_validation_funs, ValidationFuns}, Db) ->
{noreply, Db2};
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
- nil ->
- couch_log:info("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
- _ ->
- % compact currently running, this is a no-op
- {noreply, Db}
- end;
-handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader0} = couch_file:read_header(NewFd),
- NewHeader = couch_db_header:set(NewHeader0, [
- {compacted_seq, Db#db.update_seq}
- ]),
- #db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
- unlink(NewFd),
- case Db#db.update_seq == NewSeq of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
-
- NewDb2 = commit_data(NewDb#db{
- local_tree = NewLocalBtree,
- main_pid = self(),
- filepath = Filepath,
- instance_start_time = Db#db.instance_start_time,
- revs_limit = Db#db.revs_limit
- }),
-
- couch_log:debug("CouchDB swapping files ~s and ~s.",
- [Filepath, CompactFilepath]),
- ok = file:rename(CompactFilepath, Filepath ++ ".compact"),
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- % Delete the old meta compaction file after promoting
- % the compaction file.
- couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
- close_db(Db),
- NewDb3 = refresh_validate_doc_funs(NewDb2),
- ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
- couch_event:notify(NewDb3#db.name, compacted),
- couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb3#db{compactor_pid=nil}};
- false ->
- couch_log:info("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- close_db(NewDb),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
+ nil ->
+ % For now we only support compacting to the same
+ % storage engine. After the first round of patches
+ % we'll add a field that sets the target engine
+ % type to compact to with a new copy compactor.
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ Args = [Db#db.name, UpdateSeq],
+ couch_log:info("Starting compaction for db \"~s\" at ~p", Args),
+ {ok, Db2} = couch_db_engine:start_compaction(Db),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
+ _ ->
+ % compact currently running, this is a no-op
+ {noreply, Db}
end;
+handle_cast({compact_done, CompactEngine, CompactInfo}, #db{} = OldDb) ->
+ {ok, NewDb} = case couch_db_engine:get_engine(OldDb) of
+ CompactEngine ->
+ couch_db_engine:finish_compaction(OldDb, CompactInfo);
+ _ ->
+ finish_engine_swap(OldDb, CompactEngine, CompactInfo)
+ end,
+ {noreply, NewDb};
handle_cast(Msg, #db{name = Name} = Db) ->
couch_log:error("Database `~s` updater received unexpected cast: ~p",
@@ -286,9 +216,9 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
FullCommit2) of
{ok, Db2, UpdatedDDocIds} ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_event:notify(Db2#db.name, updated);
- true -> ok
+ case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
+ {Seq, Seq} -> ok;
+ _ -> couch_event:notify(Db2#db.name, updated)
end,
if NonRepDocs2 /= [] ->
couch_event:notify(Db2#db.name, local_updated);
@@ -331,9 +261,8 @@ handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db};
handle_info({'EXIT', _Pid, Reason}, Db) ->
{stop, Reason, Db};
-handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
- couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]),
- {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}.
+handle_info(Msg, Db) ->
+ couch_db_engine:handle_info(Msg, Db).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -384,235 +313,32 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
{GroupedDocsAcc, ClientsAcc, FullCommit}
end.
-rev_tree(DiskTree) ->
- couch_key_tree:map(fun
- (_RevId, {Del, Ptr, Seq}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq
- };
- (_RevId, {Del, Ptr, Seq, Size}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq,
- sizes = upgrade_sizes(Size)
- };
- (_RevId, {Del, Ptr, Seq, Sizes, Atts}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq,
- sizes = upgrade_sizes(Sizes),
- atts = Atts
- };
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING
- end, DiskTree).
-
-disk_tree(RevTree) ->
- couch_key_tree:map(fun
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING;
- (_RevId, #leaf{} = Leaf) ->
- #leaf{
- deleted = Del,
- ptr = Ptr,
- seq = Seq,
- sizes = Sizes,
- atts = Atts
- } = Leaf,
- {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts}
- end, RevTree).
-upgrade_sizes(#size_info{}=SI) ->
- SI;
-upgrade_sizes({D, E}) ->
- #size_info{active=D, external=E};
-upgrade_sizes(S) when is_integer(S) ->
- #size_info{active=S, external=0}.
-
-split_sizes(#size_info{}=SI) ->
- {SI#size_info.active, SI#size_info.external}.
-
-join_sizes({Active, External}) when is_integer(Active), is_integer(External) ->
- #size_info{active=Active, external=External}.
-
-btree_by_seq_split(#full_doc_info{}=Info) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = Del,
- sizes = SizeInfo,
- rev_tree = Tree
- } = Info,
- {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}.
-
-btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
- btree_by_seq_join(Seq, {Id, Del, {0, 0}, DiskTree});
-btree_by_seq_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = ?i2b(Del),
- sizes = join_sizes(Sizes),
- rev_tree = rev_tree(DiskTree)
- };
-btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
- % Older versions stored #doc_info records in the seq_tree.
- % Compact to upgrade.
- #doc_info{
- id = Id,
- high_seq=KeySeq,
- revs =
- [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
- {Rev, Seq, Bp} <- RevInfos] ++
- [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
- {Rev, Seq, Bp} <- DeletedRevInfos]}.
-
-btree_by_id_split(#full_doc_info{}=Info) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = Deleted,
- sizes = SizeInfo,
- rev_tree = Tree
- } = Info,
- {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}.
-
-% Handle old formats before data_size was added
-btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
- btree_by_id_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree});
-
-btree_by_id_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) ->
- #full_doc_info{
- id = Id,
- update_seq = HighSeq,
- deleted = ?i2b(Deleted),
- sizes = upgrade_sizes(Sizes),
- rev_tree = rev_tree(DiskTree)
- }.
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- lists:foldl(
- fun(Info, {NotDeleted, Deleted, Sizes}) ->
- Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes),
- case Info#full_doc_info.deleted of
- true ->
- {NotDeleted, Deleted + 1, Sizes2};
- false ->
- {NotDeleted + 1, Deleted, Sizes2}
- end
- end,
- {0, 0, #size_info{}}, FullDocInfos);
-btree_by_id_reduce(rereduce, Reds) ->
- lists:foldl(
- fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) ->
- % pre 1.2 format, will be upgraded on compaction
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
- ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) ->
- AccSizes2 = reduce_sizes(AccSizes, Sizes),
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2}
- end,
- {0, 0, #size_info{}}, Reds).
-
-reduce_sizes(nil, _) ->
- nil;
-reduce_sizes(_, nil) ->
- nil;
-reduce_sizes(#size_info{}=S1, #size_info{}=S2) ->
- #size_info{
- active = S1#size_info.active + S2#size_info.active,
- external = S1#size_info.external + S2#size_info.external
- };
-reduce_sizes(S1, S2) ->
- reduce_sizes(upgrade_sizes(S1), upgrade_sizes(S2)).
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-init_db(DbName, Filepath, Fd, Header0, Options) ->
- Header = couch_db_header:upgrade(Header0),
-
- {ok, FsyncOptions} = couch_util:parse_term(
- config:get("couchdb", "fsync_options",
- "[before_header, after_header, on_file_open]")),
-
- case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
- _ -> ok
- end,
-
- Compression = couch_compress:get_compression_method(),
-
- IdTreeState = couch_db_header:id_tree_state(Header),
- SeqTreeState = couch_db_header:seq_tree_state(Header),
- LocalTreeState = couch_db_header:local_tree_state(Header),
- {ok, IdBtree} = couch_btree:open(IdTreeState, Fd,
- [{split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2},
- {compression, Compression}]),
- {ok, SeqBtree} = couch_btree:open(SeqTreeState, Fd,
- [{split, fun ?MODULE:btree_by_seq_split/1},
- {join, fun ?MODULE:btree_by_seq_join/2},
- {reduce, fun ?MODULE:btree_by_seq_reduce/2},
- {compression, Compression}]),
- {ok, LocalDocsBtree} = couch_btree:open(LocalTreeState, Fd,
- [{compression, Compression}]),
- case couch_db_header:security_ptr(Header) of
- nil ->
- Security = default_security_object(DbName),
- SecurityPtr = nil;
- SecurityPtr ->
- {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
- end,
+init_db(DbName, FilePath, EngineState, Options) ->
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = os:timestamp(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- ok = couch_file:set_db_pid(Fd, self()),
- Db = #db{
- fd=Fd,
- fd_monitor = erlang:monitor(process, Fd),
- header=Header,
- id_tree = IdBtree,
- seq_tree = SeqBtree,
- local_tree = LocalDocsBtree,
- committed_update_seq = couch_db_header:update_seq(Header),
- update_seq = couch_db_header:update_seq(Header),
+
+ BDU = couch_util:get_value(before_doc_update, Options, nil),
+ ADR = couch_util:get_value(after_doc_read, Options, nil),
+
+ CleanedOpts = [Opt || Opt <- Options, Opt /= create],
+
+ InitDb = #db{
name = DbName,
- filepath = Filepath,
- security = Security,
- security_ptr = SecurityPtr,
+ filepath = FilePath,
+ engine = EngineState,
instance_start_time = StartTime,
- revs_limit = couch_db_header:revs_limit(Header),
- fsync_options = FsyncOptions,
- options = Options,
- compression = Compression,
- before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
- after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
+ options = CleanedOpts,
+ before_doc_update = BDU,
+ after_doc_read = ADR
},
- % If we just created a new UUID while upgrading a
- % database then we want to flush that to disk or
- % we risk sending out the uuid and having the db
- % crash which would result in it generating a new
- % uuid each time it was reopened.
- case Header /= Header0 of
- true ->
- sync_header(Db, Header);
- false ->
- Db
- end.
-
-
-close_db(#db{fd_monitor = Ref}) ->
- erlang:demonitor(Ref).
+ InitDb#db{
+ committed_update_seq = couch_db_engine:get_update_seq(InitDb),
+ security = couch_db_engine:get_security(InitDb)
+ }.
refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
@@ -636,50 +362,36 @@ refresh_validate_doc_funs(Db0) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd = Fd} = Db,
+flush_trees(#db{} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
{Flushed, FinalAcc} = couch_key_tree:mapfold(
fun(_Rev, Value, Type, SizesAcc) ->
case Value of
- #doc{deleted = IsDeleted, body = {summary, _, _, _} = DocSummary} ->
- {summary, Summary, AttSizeInfo, AttsFd} = DocSummary,
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- case {AttsFd, Fd} of
- {nil, _} ->
- ok;
- {SameFd, SameFd} ->
- ok;
- _ ->
- % Fd where the attachments were written to is not the same
- % as our Fd. This can happen when a database is being
- % switched out during a compaction.
- couch_log:debug("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- ExternalSize = ?term_size(Summary),
- {ok, NewSummaryPointer, SummarySize} =
- couch_file:append_raw_chunk(Fd, Summary),
- Leaf = #leaf{
- deleted = IsDeleted,
- ptr = NewSummaryPointer,
- seq = UpdateSeq,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
+ % This node is a document summary that needs to be
+ % flushed to disk.
+ #doc{} = Doc ->
+ check_doc_atts(Db, Doc),
+ ExternalSize = ?term_size(Doc#doc.body),
+ {size_info, AttSizeInfo} =
+ lists:keyfind(size_info, 1, Doc#doc.meta),
+ {ok, NewDoc, WrittenSize} =
+ couch_db_engine:write_doc_body(Db, Doc),
+ Leaf = #leaf{
+ deleted = Doc#doc.deleted,
+ ptr = NewDoc#doc.body,
+ seq = UpdateSeq,
+ sizes = #size_info{
+ active = WrittenSize,
+ external = ExternalSize
+ },
+ atts = AttSizeInfo
},
- atts = AttSizeInfo
- },
- {Leaf, add_sizes(Type, Leaf, SizesAcc)};
- #leaf{} ->
- {Value, add_sizes(Type, Value, SizesAcc)};
- _ ->
- {Value, SizesAcc}
+ {Leaf, add_sizes(Type, Leaf, SizesAcc)};
+ #leaf{} ->
+ {Value, add_sizes(Type, Value, SizesAcc)};
+ _ ->
+ {Value, SizesAcc}
end
end, {0, 0, []}, Unflushed),
{FinalAS, FinalES, FinalAtts} = FinalAcc,
@@ -693,6 +405,29 @@ flush_trees(#db{fd = Fd} = Db,
},
flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]).
+
+check_doc_atts(Db, Doc) ->
+ {atts_stream, Stream} = lists:keyfind(atts_stream, 1, Doc#doc.meta),
+ % Make sure that the attachments were written to the currently
+ % active attachment stream. If compaction swaps during a write
+ % request we may have to rewrite our attachment bodies.
+ if Stream == nil -> ok; true ->
+ case couch_db:is_active_stream(Db, Stream) of
+ true ->
+ ok;
+ false ->
+ % Stream where the attachments were written to is
+ % no longer the current attachment stream. This
+ % can happen when a database is switched at
+ % compaction time.
+ couch_log:debug("Stream where the attachments were"
+ " written has changed."
+ " Possibly retrying.", []),
+ throw(retry)
+ end
+ end.
+
+
add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
% Maybe upgrade from disk_size only
#size_info{
@@ -705,6 +440,15 @@ add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
NewAttsAcc = lists:umerge(AttSizes, AttsAcc),
{NewASAcc, NewESAcc, NewAttsAcc}.
+
+upgrade_sizes(#size_info{}=SI) ->
+ SI;
+upgrade_sizes({D, E}) ->
+ #size_info{active=D, external=E};
+upgrade_sizes(S) when is_integer(S) ->
+ #size_info{active=S, external=0}.
+
+
send_result(Client, Doc, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
@@ -831,58 +575,40 @@ merge_rev_tree(OldInfo, NewDoc, _Client, Limit, true) ->
{NewTree, _} = couch_key_tree:merge(OldTree, NewTree0, Limit),
OldInfo#full_doc_info{rev_tree = NewTree}.
-stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
- [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
- #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ RevsLimit = couch_db_engine:get_revs_limit(Db),
-update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
- #db{
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- revs_limit = RevsLimit
- } = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
+ OldDocLookups = couch_db_engine:open_docs(Db, Ids),
+ OldDocInfos = lists:zipwith(fun
+ (_Id, #full_doc_info{} = FDI) ->
+ FDI;
(Id, not_found) ->
#full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
+ end, Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
- MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
-
- % All documents are now ready to write.
-
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit,
+ MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
- {ok, IndexFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
-
- % and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs),
+ {ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []),
+ Pairs = pair_write_info(OldDocLookups, IndexFDIs),
+ LocalDocs2 = update_local_doc_revs(LocalDocs),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
- WriteCount = length(IndexFullDocInfos),
+ WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
- WriteCount - length(RemoveSeqs)),
+ WriteCount - length(RemSeqs)),
couch_stats:increment_counter([couchdb, document_writes], WriteCount),
couch_stats:increment_counter(
[couchdb, local_document_writes],
- length(NonRepDocs)
+ length(LocalDocs2)
),
- Db3 = Db2#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq},
-
% Check if we just updated any design documents, and update the validation
% funs if we did.
UpdatedDDocIds = lists:flatmap(fun
@@ -890,549 +616,92 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
(_) -> []
end, Ids),
- Db4 = case length(UpdatedDDocIds) > 0 of
+ Db2 = case length(UpdatedDDocIds) > 0 of
true ->
- couch_event:notify(Db3#db.name, ddoc_updated),
- ddoc_cache:evict(Db3#db.name, UpdatedDDocIds),
- refresh_validate_doc_funs(Db3);
+ ddoc_cache:evict(Db1#db.name, UpdatedDDocIds),
+ refresh_validate_doc_funs(Db1);
false ->
- Db3
+ Db1
end,
- {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
-
-update_local_docs(Db, []) ->
- {ok, Db};
-update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
- BtreeEntries = lists:map(
- fun({Client, NewDoc}) ->
- #doc{
- id = Id,
- deleted = Delete,
- revs = {0, PrevRevs},
- body = Body
- } = NewDoc,
- case PrevRevs of
- [RevStr|_] ->
+ {ok, commit_data(Db2, not FullCommit), UpdatedDDocIds}.
+
+
+update_local_doc_revs(Docs) ->
+ lists:map(fun({Client, NewDoc}) ->
+ #doc{
+ deleted = Delete,
+ revs = {0, PrevRevs}
+ } = NewDoc,
+ case PrevRevs of
+ [RevStr | _] ->
PrevRev = list_to_integer(?b2l(RevStr));
[] ->
PrevRev = 0
- end,
- case Delete of
- false ->
- send_result(Client, NewDoc, {ok,
- {0, ?l2b(integer_to_list(PrevRev + 1))}}),
- {update, {Id, {PrevRev + 1, Body}}};
- true ->
- send_result(Client, NewDoc,
- {ok, {0, <<"0">>}}),
- {remove, Id}
- end
- end, Docs),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_tree = Btree2}}.
+ end,
+ NewRev = case Delete of
+ false ->
+ ?l2b(integer_to_list(PrevRev + 1));
+ true ->
+ <<"0">>
+ end,
+ send_result(Client, NewDoc, {ok, {0, NewRev}}),
+ NewDoc#doc{
+ revs = {0, [NewRev]}
+ }
+ end, Docs).
-db_to_header(Db, Header) ->
- couch_db_header:set(Header, [
- {update_seq, Db#db.update_seq},
- {seq_tree_state, couch_btree:get_state(Db#db.seq_tree)},
- {id_tree_state, couch_btree:get_state(Db#db.id_tree)},
- {local_tree_state, couch_btree:get_state(Db#db.local_tree)},
- {security_ptr, Db#db.security_ptr},
- {revs_limit, Db#db.revs_limit}
- ]).
commit_data(Db) ->
commit_data(Db, false).
-commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
- TRef = erlang:send_after(1000,self(),delayed_commit),
- Db#db{waiting_delayed_commit=TRef};
+commit_data(#db{waiting_delayed_commit = nil} = Db, true) ->
+ TRef = erlang:send_after(1000, self(), delayed_commit),
+ Db#db{waiting_delayed_commit = TRef};
commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
- header = OldHeader,
- waiting_delayed_commit = Timer
- } = Db,
- if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
- case db_to_header(Db, OldHeader) of
- OldHeader -> Db#db{waiting_delayed_commit=nil};
- NewHeader -> sync_header(Db, NewHeader)
- end.
-
-sync_header(Db, NewHeader) ->
- #db{
- fd = Fd,
- filepath = FilePath,
- fsync_options = FsyncOptions,
waiting_delayed_commit = Timer
} = Db,
-
if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
-
- Before = lists:member(before_header, FsyncOptions),
- After = lists:member(after_header, FsyncOptions),
-
- if Before -> couch_file:sync(FilePath); true -> ok end,
- ok = couch_file:write_header(Fd, NewHeader),
- if After -> couch_file:sync(FilePath); true -> ok end,
-
- Db#db{
- header=NewHeader,
- committed_update_seq=Db#db.update_seq,
- waiting_delayed_commit=nil
+ {ok, Db1} = couch_db_engine:commit_data(Db),
+ Db1#db{
+ waiting_delayed_commit = nil,
+ committed_update_seq = couch_db_engine:get_update_seq(Db)
}.
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
- BinInfos = case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
- {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
-
-merge_lookups(Infos, []) ->
- Infos;
-merge_lookups([], _) ->
- [];
-merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
- % Assert we've matched our lookups
- if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
- erlang:error({mismatched_doc_infos, DI#doc_info.id})
- end,
- [FDI | merge_lookups(RestInfos, RestLookups)];
-merge_lookups([FDI | RestInfos], Lookups) ->
- [FDI | merge_lookups(RestInfos, Lookups)].
-
-check_md5(Md5, Md5) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
-
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
- A =< B
- end, merge_lookups(MixedInfos, LookupResults)),
-
- NewInfos1 = lists:map(fun(Info) ->
- {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
- (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
- {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
- SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
- ExternalSize = ?term_size(SummaryChunk),
- {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
- DestFd, SummaryChunk),
- AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
- NewLeaf = Leaf#leaf{
- ptr = Pos,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
- },
- atts = AttSizes
- },
- {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
- (_Rev, _Leaf, branch, SizesAcc) ->
- {?REV_MISSING, SizesAcc}
- end, {0, 0, []}, Info#full_doc_info.rev_tree),
- {FinalAS, FinalES, FinalAtts} = FinalAcc,
- TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
- NewActiveSize = FinalAS + TotalAttSize,
- NewExternalSize = FinalES + TotalAttSize,
- Info#full_doc_info{
- rev_tree = NewRevTree,
- sizes = #size_info{
- active = NewActiveSize,
- external = NewExternalSize
- }
- }
- end, NewInfos0),
-
- NewInfos = stem_full_doc_infos(Db, NewInfos1),
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")),
- CheckpointAfter = couch_util:to_integer(
- config:get("database_compaction", "checkpoint_after",
- BufferSize * 10)),
-
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
+maybe_track_db(#db{options = Options}) ->
+ case lists:member(sys_db, Options) of
true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, Db#db.name},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
-
-start_copy_compact(#db{}=Db) ->
- erlang:put(io_priority, {db_compact, Db#db.name}),
- #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
- couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
-
- {ok, NewDb, DName, DFd, MFd, Retry} =
- open_compaction_files(Name, Header, Filepath, Options),
- erlang:monitor(process, MFd),
-
- % This is a bit worrisome. init_db/4 will monitor the data fd
- % but it doesn't know about the meta fd. For now I'll maintain
- % that the data fd is the old normal fd and meta fd is special
- % and hope everything works out for the best.
- unlink(DFd),
-
- NewDb1 = copy_purge_info(Db, NewDb),
- NewDb2 = copy_compact(Db, NewDb1, Retry),
- NewDb3 = sort_meta_data(NewDb2),
- NewDb4 = commit_compaction_data(NewDb3),
- NewDb5 = copy_meta_data(NewDb4),
- NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
- close_db(NewDb6),
-
- ok = couch_file:close(MFd),
- gen_server:cast(Db#db.main_pid, {compact_done, DName}).
-
-
-open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
- DataFile = DbFilePath ++ ".compact.data",
- MetaFile = DbFilePath ++ ".compact.meta",
- {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
- DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
- case {DataHdr, MetaHdr} of
- {#comp_header{}=A, #comp_header{}=A} ->
- DbHeader = A#comp_header.db_header,
- Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ when DataHdrIsDbHdr ->
- ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
- Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ ->
- Header = couch_db_header:from(SrcHdr),
- ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, nil}
- end.
-
-
-open_compaction_file(FilePath) ->
- case couch_file:open(FilePath, [nologifmissing]) of
- {ok, Fd} ->
- case couch_file:read_header(Fd) of
- {ok, Header} -> {ok, Fd, Header};
- no_valid_header -> {ok, Fd, nil}
- end;
- {error, enoent} ->
- {ok, Fd} = couch_file:open(FilePath, [create]),
- {ok, Fd, nil}
- end.
-
-
-reset_compaction_file(Fd, Header) ->
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, Header).
-
-
-copy_purge_info(OldDb, NewDb) ->
- OldHdr = OldDb#db.header,
- NewHdr = NewDb#db.header,
- OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
- if OldPurgeSeq > 0 ->
- {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
- Opts = [{compression, NewDb#db.compression}],
- {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
- NewNewHdr = couch_db_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewDb#db{header = NewNewHdr};
- true ->
- NewDb
+ ok;
+ false ->
+ couch_stats_process_tracker:track([couchdb, open_databases])
end.
-commit_compaction_data(#db{}=Db) ->
- % Compaction needs to write headers to both the data file
- % and the meta file so if we need to restart we can pick
- % back up from where we left off.
- commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
- commit_compaction_data(Db, Db#db.fd).
-
-
-commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
- % Mostly copied from commit_data/2 but I have to
- % replace the logic to commit and fsync to a specific
- % fd instead of the Filepath stuff that commit_data/2
- % does.
- DataState = couch_db_header:id_tree_state(OldHeader),
- MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
- MetaState = couch_emsort:get_state(Db0#db.id_tree),
- Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
- Header = db_to_header(Db1, OldHeader),
- CompHeader = #comp_header{
- db_header = Header,
- meta_state = MetaState
- },
- ok = couch_file:sync(Fd),
- ok = couch_file:write_header(Fd, CompHeader),
- Db2 = Db1#db{
- waiting_delayed_commit=nil,
- header=Header,
- committed_update_seq=Db1#db.update_seq
- },
- bind_emsort(Db2, MetaFd, MetaState).
-
-
-bind_emsort(Db, Fd, nil) ->
- {ok, Ems} = couch_emsort:open(Fd),
- Db#db{id_tree=Ems};
-bind_emsort(Db, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
- Db#db{id_tree=Ems}.
-
-
-bind_id_tree(Db, Fd, State) ->
- {ok, IdBtree} = couch_btree:open(State, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- Db#db{id_tree=IdBtree}.
-
-
-sort_meta_data(Db0) ->
- {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
- Db0#db{id_tree=Ems}.
-
-
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
- Src = Db#db.id_tree,
- DstState = couch_db_header:id_tree_state(Header),
- {ok, IdTree0} = couch_btree:open(DstState, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- {ok, Iter} = couch_emsort:iter(Src),
- Acc0 = #merge_st{
- id_tree=IdTree0,
- seq_tree=Db#db.seq_tree,
- rem_seqs=[],
- infos=[]
- },
- Acc = merge_docids(Iter, Acc0),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
- {ok, SeqTree} = couch_btree:add_remove(
- Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
- ),
- Db#db{id_tree=IdTree, seq_tree=SeqTree}.
-
-
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
- #merge_st{
- id_tree=IdTree0,
- seq_tree=SeqTree0,
- rem_seqs=RemSeqs
- } = Acc,
- {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
- {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
- Acc1 = Acc#merge_st{
- id_tree=IdTree1,
- seq_tree=SeqTree1,
- rem_seqs=[],
- infos=[]
- },
- merge_docids(Iter, Acc1);
-merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
- case next_info(Iter, Curr, []) of
- {NextIter, NewCurr, FDI, Seqs} ->
- Acc1 = Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = NewCurr
- },
- merge_docids(NextIter, Acc1);
- {finished, FDI, Seqs} ->
- Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = undefined
- };
- empty ->
- Acc
- end.
-
+finish_engine_swap(_OldDb, _NewEngine, _CompactFilePath) ->
+ erlang:error(explode).
-next_info(Iter, undefined, []) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, Seq}, FDI}, NextIter} ->
- next_info(NextIter, {Id, Seq, FDI}, []);
- finished ->
- empty
- end;
-next_info(Iter, {Id, Seq, FDI}, Seqs) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, NSeq}, NFDI}, NextIter} ->
- next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
- {ok, {{NId, NSeq}, NFDI}, NextIter} ->
- {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
- finished ->
- {finished, FDI, Seqs}
- end.
+pair_write_info(Old, New) ->
+ lists:map(fun(FDI) ->
+ case lists:keyfind(FDI#full_doc_info.id, #full_doc_info.id, Old) of
+ #full_doc_info{} = OldFDI -> {OldFDI, FDI};
+ false -> {not_found, FDI}
+ end
+ end, New).
-update_compact_task(NumChanges) ->
- [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
- Changes2 = Changes + NumChanges,
- Progress = case Total of
- 0 ->
- 0;
- _ ->
- (Changes2 * 100) div Total
- end,
- couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+pair_purge_info(Old, New) ->
+ lists:map(fun(OldFDI) ->
+ case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
+ #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
+ false -> {OldFDI, not_found}
+ end
+ end, Old).
-make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
- Body = case couch_compress:is_compressed(Body0, Comp) of
- true ->
- Body0;
- false ->
- % pre 1.2 database file format
- couch_compress:compress(Body0, Comp)
- end,
- Atts = case couch_compress:is_compressed(Atts0, Comp) of
- true ->
- Atts0;
- false ->
- couch_compress:compress(Atts0, Comp)
- end,
- SummaryBin = ?term_to_bin({Body, Atts}),
- couch_file:assemble_file_chunk(SummaryBin, couch_crypto:hash(md5, SummaryBin)).
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "everyone") of
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 34a1539..9183be3 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -217,7 +217,13 @@ handle_design_info_req(#httpd{
create_db_req(#httpd{user_ctx=UserCtx}=Req, DbName) ->
ok = couch_httpd:verify_is_server_admin(Req),
- case couch_server:create(DbName, [{user_ctx, UserCtx}]) of
+ Engine = case couch_httpd:qs_value(Req, "engine") of
+ EngineStr when is_list(EngineStr) ->
+ [{engine, iolist_to_binary(EngineStr)}];
+ _ ->
+ []
+ end,
+ case couch_server:create(DbName, [{user_ctx, UserCtx}] ++ Engine) of
{ok, Db} ->
couch_db:close(Db),
DbUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
diff --git a/src/couch/src/couch_httpd_misc_handlers.erl b/src/couch/src/couch_httpd_misc_handlers.erl
index eb75a94..3fc4d9a 100644
--- a/src/couch/src/couch_httpd_misc_handlers.erl
+++ b/src/couch/src/couch_httpd_misc_handlers.erl
@@ -17,8 +17,6 @@
handle_uuids_req/1,handle_config_req/1,
handle_task_status_req/1, handle_file_req/2]).
--export([increment_update_seq_req/2]).
-
-include_lib("couch/include/couch_db.hrl").
@@ -310,14 +308,3 @@ handle_approved_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Re
send_json(Req, 200, list_to_binary(OldValue))
end.
-
-% httpd db handlers
-
-increment_update_seq_req(#httpd{method='POST'}=Req, Db) ->
- couch_httpd:validate_ctype(Req, "application/json"),
- {ok, NewSeq} = couch_db:increment_update_seq(Db),
- send_json(Req, {[{ok, true},
- {update_seq, NewSeq}
- ]});
-increment_update_seq_req(Req, _Db) ->
- send_method_not_allowed(Req, "POST").
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 7f6a7f9..650b961 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -21,6 +21,8 @@
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
-export([close_lru/0]).
+-export([delete_compaction_files/1]).
+-export([exists/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -33,6 +35,7 @@
-record(server,{
root_dir = [],
+ engines = [],
max_dbs_open=?MAX_DBS_OPEN,
dbs_open=0,
start_time="",
@@ -114,6 +117,35 @@ create(DbName, Options0) ->
delete(DbName, Options) ->
gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+
+exists(DbName) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ Engines = get_configured_engines(),
+ Possible = lists:foldl(fun({Extension, Engine}, Acc) ->
+ Path = make_filepath(RootDir, DbName, Extension),
+ case couch_db_engine:exists(Engine, Path) of
+ true ->
+ [{Engine, Path} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Engines),
+ Possible /= [].
+
+
+delete_compaction_files(DbName) ->
+ delete_compaction_files(DbName, []).
+
+delete_compaction_files(DbName, DelOpts) when is_list(DbName) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ lists:foreach(fun({Ext, Engine}) ->
+ FPath = make_filepath(RootDir, DbName, Ext),
+ couch_db_engine:delete_compaction_files(Engine, RootDir, FPath, DelOpts)
+ end, get_configured_engines()),
+ ok;
+delete_compaction_files(DbName, DelOpts) when is_binary(DbName) ->
+ delete_compaction_files(?b2l(DbName), DelOpts).
+
maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) ->
maybe_add_sys_db_callbacks(?b2l(DbName), Options);
maybe_add_sys_db_callbacks(DbName, Options) ->
@@ -161,9 +193,6 @@ is_admin(User, ClearPwd) ->
has_admins() ->
config:get("admins") /= [].
-get_full_filename(Server, DbName) ->
- filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
-
hash_admin_passwords() ->
hash_admin_passwords(true).
@@ -181,6 +210,7 @@ init([]) ->
% will restart us and then we will pick up the new settings.
RootDir = config:get("couchdb", "database_dir", "."),
+ Engines = get_configured_engines(),
MaxDbsOpen = list_to_integer(
config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))),
UpdateLruOnRead =
@@ -192,6 +222,7 @@ init([]) ->
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
+ engines = Engines,
max_dbs_open=MaxDbsOpen,
update_lru_on_read=UpdateLruOnRead,
start_time=couch_util:rfc1123_date()}}.
@@ -216,6 +247,8 @@ handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) ->
{ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})};
handle_config_change("couchdb", "max_dbs_open", _, _, _) ->
{ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})};
+handle_config_change("couchdb_engines", _, _, _, _) ->
+ {ok, gen_server:call(couch_server,reload_engines)};
handle_config_change("admins", _, _, Persist, _) ->
% spawn here so couch event manager doesn't deadlock
{ok, spawn(fun() -> hash_admin_passwords(Persist) end)};
@@ -250,11 +283,15 @@ all_databases() ->
all_databases(Fun, Acc0) ->
{ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
NormRoot = couch_util:normpath(Root),
- FinalAcc = try
- filelib:fold_files(Root,
+ Extensions = get_engine_extensions(),
+ ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")",
+ RegExp =
"^[a-z0-9\\_\\$()\\+\\-]*" % stock CouchDB name regex
"(\\.[0-9]{10,})?" % optional shard timestamp
- "\\.couch$", % filename extension
+ "\\." ++ ExtRegExp ++ "$", % filename extension
+ FinalAcc = try
+ couch_util:fold_files(Root,
+ RegExp,
true,
fun(Filename, AccIn) ->
NormFilename = couch_util:normpath(Filename),
@@ -262,7 +299,8 @@ all_databases(Fun, Acc0) ->
[$/ | RelativeFilename] -> ok;
RelativeFilename -> ok
end,
- case Fun(couch_util:drop_dot_couch_ext(?l2b(RelativeFilename)), AccIn) of
+ Ext = filename:extension(RelativeFilename),
+ case Fun(?l2b(filename:rootname(RelativeFilename, Ext)), AccIn) of
{ok, NewAcc} -> NewAcc;
{stop, NewAcc} -> throw({stop, Fun, NewAcc})
end
@@ -284,11 +322,11 @@ maybe_close_lru_db(#server{lru=Lru}=Server) ->
maybe_close_lru_db(db_closed(Server#server{lru = NewLru}))
end.
-open_async(Server, From, DbName, Filepath, Options) ->
+open_async(Server, From, DbName, {Module, Filepath}, Options) ->
Parent = self(),
T0 = os:timestamp(),
Opener = spawn_link(fun() ->
- Res = couch_db:start_link(DbName, Filepath, Options),
+ Res = couch_db:start_link(Module, DbName, Filepath, Options),
case {Res, lists:member(create, Options)} of
{{ok, _Db}, true} ->
couch_event:notify(DbName, created);
@@ -326,6 +364,8 @@ handle_call({set_update_lru_on_read, UpdateOnRead}, _From, Server) ->
{reply, ok, Server#server{update_lru_on_read=UpdateOnRead}};
handle_call({set_max_dbs_open, Max}, _From, Server) ->
{reply, ok, Server#server{max_dbs_open=Max}};
+handle_call(reload_engines, _From, Server) ->
+ {reply, ok, Server#server{engines = get_configured_engines()}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
@@ -343,7 +383,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
[gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
case ReqType of
- {create, DbName, _Filepath, _Options, CrFrom} ->
+ {create, DbName, _Engine, _Options, CrFrom} ->
gen_server:reply(CrFrom, file_exists);
_ ->
ok
@@ -373,8 +413,8 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, FromPid),
NewServer = case ReqType of
- {create, DbName, Filepath, Options, CrFrom} ->
- open_async(Server, CrFrom, DbName, Filepath, Options);
+ {create, DbName, Engine, Options, CrFrom} ->
+ open_async(Server, CrFrom, DbName, Engine, Options);
_ ->
Server
end,
@@ -387,8 +427,8 @@ handle_call({open, DbName, Options}, From, Server) ->
case check_dbname(Server, DbNameList) of
ok ->
{ok, Server2} = maybe_close_lru_db(Server),
- Filepath = get_full_filename(Server, DbNameList),
- {noreply, open_async(Server2, From, DbName, Filepath, Options)};
+ Engine = get_engine(Server2, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Engine, Options)};
Error ->
{reply, Error, Server}
end;
@@ -404,20 +444,20 @@ handle_call({open, DbName, Options}, From, Server) ->
end;
handle_call({create, DbName, Options}, From, Server) ->
DbNameList = binary_to_list(DbName),
- Filepath = get_full_filename(Server, DbNameList),
+ Engine = get_engine(Server, DbNameList, Options),
case check_dbname(Server, DbNameList) of
ok ->
case ets:lookup(couch_dbs, DbName) of
[] ->
{ok, Server2} = maybe_close_lru_db(Server),
- {noreply, open_async(Server2, From, DbName, Filepath,
+ {noreply, open_async(Server2, From, DbName, Engine,
[create | Options])};
[#entry{req_type = open} = Entry] ->
% We're trying to create a database while someone is in
% the middle of trying to open it. We allow one creator
% to wait while we figure out if it'll succeed.
CrOptions = [create | Options],
- Req = {create, DbName, Filepath, CrOptions, From},
+ Req = {create, DbName, Engine, CrOptions, From},
true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
@@ -430,7 +470,6 @@ handle_call({delete, DbName, Options}, _From, Server) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
- FullFilepath = get_full_filename(Server, DbNameList),
Server2 =
case ets:lookup(couch_dbs, DbName) of
[] -> Server;
@@ -447,18 +486,16 @@ handle_call({delete, DbName, Options}, _From, Server) ->
db_closed(Server)
end,
- %% Delete any leftover compaction files. If we don't do this a
- %% subsequent request for this DB will try to open them to use
- %% as a recovery.
- lists:foreach(fun(Ext) ->
- couch_file:delete(Server#server.root_dir, FullFilepath ++ Ext)
- end, [".compact", ".compact.data", ".compact.meta"]),
- couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"),
-
couch_db_plugin:on_delete(DbName, Options),
DelOpt = [{context, delete} | Options],
- case couch_file:delete(Server#server.root_dir, FullFilepath, DelOpt) of
+
+ % Make sure and remove all compaction data
+ delete_compaction_files(DbNameList, DelOpt),
+
+ {Engine, FilePath} = get_engine(Server, DbNameList),
+ RootDir = Server#server.root_dir,
+ case couch_db_engine:delete(Engine, RootDir, FilePath, DelOpt) of
ok ->
couch_event:notify(DbName, deleted),
{reply, ok, Server2};
@@ -528,6 +565,106 @@ db_opened(Server) ->
db_closed(Server) ->
Server#server{dbs_open=Server#server.dbs_open - 1}.
+
+get_configured_engines() ->
+ ConfigEntries = config:get("couchdb_engines"),
+ Engines = lists:flatmap(fun({Extension, ModuleStr}) ->
+ try
+ [{Extension, list_to_atom(ModuleStr)}]
+ catch _T:_R ->
+ []
+ end
+ end, ConfigEntries),
+ case Engines of
+ [] ->
+ [{"couch", couch_bt_engine}];
+ Else ->
+ Else
+ end.
+
+
+get_engine(Server, DbName, Options) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ case couch_util:get_value(engine, Options) of
+ Ext when is_binary(Ext) ->
+ ExtStr = binary_to_list(Ext),
+ case couch_util:get_value(ExtStr, Engines) of
+ Engine when is_atom(Engine) ->
+ Path = make_filepath(RootDir, DbName, ExtStr),
+ {Engine, Path};
+ _ ->
+ get_engine(Server, DbName)
+ end;
+ _ ->
+ get_engine(Server, DbName)
+ end.
+
+
+get_engine(Server, DbName) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ Possible = lists:foldl(fun({Extension, Engine}, Acc) ->
+ Path = make_filepath(RootDir, DbName, Extension),
+ case couch_db_engine:exists(Engine, Path) of
+ true ->
+ [{Engine, Path} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Engines),
+ case Possible of
+ [] ->
+ get_default_engine(Server, DbName);
+ [Engine] ->
+ Engine;
+ _ ->
+ erlang:error(engine_conflict)
+ end.
+
+
+get_default_engine(Server, DbName) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ Default = {couch_bt_engine, make_filepath(RootDir, DbName, "couch")},
+ case config:get("couchdb", "default_engine") of
+ Extension when is_list(Extension) ->
+ case lists:keyfind(Extension, 1, Engines) of
+ {Extension, Module} ->
+ {Module, make_filepath(RootDir, DbName, Extension)};
+ false ->
+ Default
+ end;
+ _ ->
+ Default
+ end.
+
+
+make_filepath(RootDir, DbName, Extension) when is_binary(RootDir) ->
+ make_filepath(binary_to_list(RootDir), DbName, Extension);
+make_filepath(RootDir, DbName, Extension) when is_binary(DbName) ->
+ make_filepath(RootDir, binary_to_list(DbName), Extension);
+make_filepath(RootDir, DbName, Extension) when is_binary(Extension) ->
+ make_filepath(RootDir, DbName, binary_to_list(Extension));
+make_filepath(RootDir, DbName, Extension) ->
+ filename:join([RootDir, "./" ++ DbName ++ "." ++ Extension]).
+
+
+get_engine_extensions() ->
+ case config:get("couchdb_engines") of
+ [] ->
+ ["couch"];
+ Entries ->
+ [Ext || {Ext, _Mod} <- Entries]
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch/src/couch_stream.erl b/src/couch/src/couch_stream.erl
index 913977f..1980246 100644
--- a/src/couch/src/couch_stream.erl
+++ b/src/couch/src/couch_stream.erl
@@ -14,21 +14,40 @@
-behaviour(gen_server).
-vsn(1).
-% public API
--export([open/1, open/2, close/1]).
--export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
--export([copy_to_new_stream/3, write/2]).
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_cast/2, handle_call/3, handle_info/2]).
+-export([
+ open/1,
+ open/2,
+ close/1,
+
+ copy/2,
+ write/2,
+ to_disk_term/1,
+
+ foldl/3,
+ foldl/4,
+ foldl_decode/5,
+ range_foldl/5
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
-include_lib("couch/include/couch_db.hrl").
+
-define(DEFAULT_BUFFER_SIZE, 4096).
--record(stream,
- {fd = 0,
+
+-record(stream, {
+ engine,
opener_monitor,
written_pointers=[],
buffer_list = [],
@@ -42,114 +61,94 @@
identity_len = 0,
encoding_fun,
end_encoding_fun
- }).
+}).
-%%% Interface functions %%%
+open({_StreamEngine, _StreamEngineState} = Engine) ->
+ open(Engine, []).
-open(Fd) ->
- open(Fd, []).
-open(Fd, Options) ->
- gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []).
+open({_StreamEngine, _StreamEngineState} = Engine, Options) ->
+ gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []).
+
close(Pid) ->
gen_server:call(Pid, close, infinity).
-copy_to_new_stream(Fd, PosList, DestFd) ->
- {ok, Dest} = open(DestFd),
- foldl(Fd, PosList,
- fun(Bin, _) ->
- ok = write(Dest, Bin)
- end, ok),
- close(Dest).
-
-foldl(_Fd, [], _Fun, Acc) ->
- Acc;
-foldl(Fd, [Pos|Rest], Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-
-foldl(Fd, PosList, <<>>, Fun, Acc) ->
- foldl(Fd, PosList, Fun, Acc);
-foldl(Fd, PosList, Md5, Fun, Acc) ->
- foldl(Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc).
-
-foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+
+copy(Src, Dst) ->
+ foldl(Src, fun(Bin, _) ->
+ ok = write(Dst, Bin)
+ end, ok).
+
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
+ gen_server:call(Pid, {write, Bin}, infinity).
+
+
+to_disk_term({Engine, EngineState}) ->
+ Engine:to_disk_term(EngineState).
+
+
+foldl({Engine, EngineState}, Fun, Acc) ->
+ Engine:foldl(EngineState, Fun, Acc).
+
+
+foldl(Engine, <<>>, Fun, Acc) ->
+ foldl(Engine, Fun, Acc);
+foldl(Engine, Md5, UserFun, UserAcc) ->
+ InitAcc = {couch_crypto:hash_init(md5), UserFun, UserAcc},
+ {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc),
+ Md5 = couch_crypto:hash_final(md5, Md5Acc),
+ OutAcc.
+
+
+foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) ->
{DecDataFun, DecEndFun} = case Enc of
- gzip ->
- ungzip_init();
- identity ->
- identity_enc_dec_funs()
+ gzip -> ungzip_init();
+ identity -> identity_enc_dec_funs()
end,
- Result = foldl_decode(
- DecDataFun, Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc
- ),
+ InitAcc = {DecDataFun, UserFun, UserAcc1},
+ {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc),
DecEndFun(),
- Result.
+ UserAcc2.
+
+
+range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From ->
+ NewEngine = do_seek(Engine, From),
+ InitAcc = {To - From, UserFun, UserAcc},
+ try
+ {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc),
+ UserAcc2
+ catch
+ throw:{finished, UserAcc3} ->
+ UserAcc3
+ end.
-foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = couch_crypto:hash_final(md5, Md5Acc),
- Acc;
-foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
- foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, Bin)),
- Fun(Bin, Acc);
-foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Md5, couch_crypto:hash_update(md5, Md5Acc, Bin), Fun, Fun(Bin, Acc)).
-
-range_foldl(Fd, PosList, From, To, Fun, Acc) ->
- range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
-
-range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
- Acc;
-range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
-range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
-range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Bin1 = if
- From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
- true ->
- PrefixLen = clip(From - Off, 0, Size),
- PostfixLen = clip(Off + Size - To, 0, Size),
- MatchLen = Size - PrefixLen - PostfixLen,
- <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
- Match
- end,
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
-clip(Value, Lo, Hi) ->
- if
- Value < Lo -> Lo;
- Value > Hi -> Hi;
- true -> Value
+foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) ->
+ NewMd5Acc = couch_crypto:hash_update(md5, Md5Acc, Bin),
+ {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}.
+
+
+foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) ->
+ case DecFun(EncBin) of
+ <<>> -> {DecFun, UserFun, UserAcc};
+ Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)}
end.
-foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = couch_crypto:hash_final(md5, Md5Acc),
- Acc;
-foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, EncBin)),
- Bin = DecFun(EncBin),
- Fun(Bin, Acc);
-foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Bin = DecFun(EncBin),
- Md5Acc2 = couch_crypto:hash_update(md5, Md5Acc, EncBin),
- foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+foldl_length(Bin, {Length, UserFun, UserAcc}) ->
+ BinSize = size(Bin),
+ case BinSize =< Length of
+ true ->
+ {Length - BinSize, UserFun, UserFun(Bin, UserAcc)};
+ false ->
+ <<Trunc:BinSize/binary, _/binary>> = Bin,
+ throw({finished, UserFun(Trunc, UserAcc)})
+ end.
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
@@ -192,23 +191,16 @@ identity_enc_dec_funs() ->
fun() -> [] end
}.
-write(_Pid, <<>>) ->
- ok;
-write(Pid, Bin) ->
- gen_server:call(Pid, {write, Bin}, infinity).
-
-init({Fd, OpenerPid, OpenerPriority, Options}) ->
+init({Engine, OpenerPid, OpenerPriority, Options}) ->
erlang:put(io_priority, OpenerPriority),
{EncodingFun, EndEncodingFun} =
case couch_util:get_value(encoding, Options, identity) of
- identity ->
- identity_enc_dec_funs();
- gzip ->
- gzip_init(Options)
+ identity -> identity_enc_dec_funs();
+ gzip -> gzip_init(Options)
end,
{ok, #stream{
- fd=Fd,
+ engine=Engine,
opener_monitor=erlang:monitor(process, OpenerPid),
md5=couch_crypto:hash_init(md5),
identity_md5=couch_crypto:hash_init(md5),
@@ -225,9 +217,8 @@ terminate(_Reason, _Stream) ->
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
- fd = Fd,
+ engine = Engine,
written_len = WrittenLen,
- written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
@@ -242,19 +233,18 @@ handle_call({write, Bin}, _From, Stream) ->
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
+ NewEngine = Engine,
WrittenLen2 = WrittenLen,
- Md5_2 = Md5,
- Written2 = Written;
+ Md5_2 = Md5;
WriteBin2 ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
+ NewEngine = do_write(Engine, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
- Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2),
- Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
+ Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2)
end,
{reply, ok, Stream#stream{
+ engine = NewEngine,
written_len=WrittenLen2,
- written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
@@ -268,10 +258,9 @@ handle_call({write, Bin}, _From, Stream) ->
end;
handle_call(close, _From, Stream) ->
#stream{
- fd = Fd,
+ engine = Engine,
opener_monitor = MonRef,
written_len = WrittenLen,
- written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
@@ -285,12 +274,11 @@ handle_call(close, _From, Stream) ->
Md5Final = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
- {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
+ {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
- StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
+ NewEngine = do_write(Engine, WriteBin2),
StreamLen = WrittenLen + iolist_size(WriteBin2),
- {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
+ {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
erlang:demonitor(MonRef),
{stop, normal, Result, Stream}.
@@ -305,3 +293,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
+
+
+do_seek({Engine, EngineState}, Offset) ->
+ {ok, NewState} = Engine:seek(EngineState, Offset),
+ {Engine, NewState}.
+
+do_write({Engine, EngineState}, Data) ->
+ {ok, NewState} = Engine:write(EngineState, Data),
+ {Engine, NewState}.
+
+do_finalize({Engine, EngineState}) ->
+ {ok, NewState} = Engine:finalize(EngineState),
+ {Engine, NewState}.
+
diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl
index d688c12..dc2ef64 100644
--- a/src/couch/src/couch_util.erl
+++ b/src/couch/src/couch_util.erl
@@ -12,7 +12,7 @@
-module(couch_util).
--export([priv_dir/0, normpath/1]).
+-export([priv_dir/0, normpath/1, fold_files/5]).
-export([should_flush/0, should_flush/1, to_existing_atom/1]).
-export([rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, drop_dot_couch_ext/1]).
@@ -33,6 +33,7 @@
-export([find_in_binary/2]).
-export([callback_exists/3, validate_callback_exists/3]).
-export([with_proc/4]).
+-export([check_md5/2]).
-include_lib("couch/include/couch_db.hrl").
@@ -62,6 +63,37 @@ normparts(["." | RestParts], Acc) ->
normparts([Part | RestParts], Acc) ->
normparts(RestParts, [Part | Acc]).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc) ->
+ {ok, Re} = re:compile(RegExp, [unicode]),
+ fold_files1(Dir, Re, Recursive, Fun, Acc).
+
+fold_files1(Dir, RegExp, Recursive, Fun, Acc) ->
+ case file:list_dir(Dir) of
+ {ok, Files} ->
+ fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc);
+ {error, _} ->
+ Acc
+ end.
+
+fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc) ->
+ Acc;
+fold_files2([File | Rest], Dir, RegExp, Recursive, Fun, Acc0) ->
+ FullName = filename:join(Dir, File),
+ case (catch re:run(File, RegExp, [{capture, none}])) of
+ match ->
+ Acc1 = Fun(FullName, Acc0),
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1);
+ _ ->
+ case Recursive andalso filelib:is_dir(FullName) of
+ true ->
+ Acc1 = fold_files1(FullName, RegExp, Recursive, Fun, Acc0),
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1);
+ false ->
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc0)
+ end
+ end.
+
% works like list_to_existing_atom, except can be list or binary and it
% gives you the original value instead of an error if no existing atom.
to_existing_atom(V) when is_list(V) ->
@@ -578,6 +610,12 @@ validate_callback_exists(Module, Function, Arity) ->
{undefined_callback, CallbackStr, {Module, Function, Arity}}})
end.
+
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig, Sig) -> ok;
+check_md5(_, _) -> throw(md5_mismatch).
+
+
ensure_loaded(Module) when is_atom(Module) ->
case code:ensure_loaded(Module) of
{module, Module} ->
diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl
index b5bb232..6bf7b46 100644
--- a/src/couch/src/test_util.erl
+++ b/src/couch/src/test_util.erl
@@ -34,6 +34,8 @@
-export([start/1, start/2, start/3, stop/1]).
+-export([fake_db/1]).
+
-record(test_context, {mocked = [], started = [], module}).
-define(DEFAULT_APPS,
diff --git a/src/couch/test/couch_db_plugin_tests.erl b/src/couch/test/couch_db_plugin_tests.erl
index 94dd3df..52533fe 100644
--- a/src/couch/test/couch_db_plugin_tests.erl
+++ b/src/couch/test/couch_db_plugin_tests.erl
@@ -43,7 +43,7 @@ data_providers() -> [].
data_subscriptions() -> [].
processes() -> [].
notify(_, _, _) -> ok.
-fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)).
+fake_db() -> test_util:fake_db([]).
setup() ->
couch_tests:setup([
diff --git a/src/couch/test/couch_stream_tests.erl b/src/couch/test/couch_stream_tests.erl
index 3d7bf09..901b4fd 100644
--- a/src/couch/test/couch_stream_tests.erl
+++ b/src/couch/test/couch_stream_tests.erl
@@ -14,10 +14,11 @@
-include_lib("couch/include/couch_eunit.hrl").
+-define(ENGINE, {couch_bt_engine_stream, {Fd, []}}).
setup() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
- {ok, Stream} = couch_stream:open(Fd),
+ {ok, Stream} = couch_stream:open(?ENGINE, []),
{Fd, Stream}.
teardown({Fd, _}) ->
@@ -61,7 +62,8 @@ should_write_empty_binary({_, Stream}) ->
should_return_file_pointers_on_close({_, Stream}) ->
couch_stream:write(Stream, <<"foodfoob">>),
- {Ptrs, _, _, _, _} = couch_stream:close(Stream),
+ {NewEngine, _, _, _, _} = couch_stream:close(Stream),
+ {ok, Ptrs} = couch_stream:to_disk_term(NewEngine),
?_assertEqual([{0, 8}], Ptrs).
should_return_stream_size_on_close({_, Stream}) ->
@@ -69,41 +71,43 @@ should_return_stream_size_on_close({_, Stream}) ->
{_, Length, _, _, _} = couch_stream:close(Stream),
?_assertEqual(8, Length).
-should_return_valid_pointers({Fd, Stream}) ->
+should_return_valid_pointers({_Fd, Stream}) ->
couch_stream:write(Stream, <<"foodfoob">>),
- {Ptrs, _, _, _, _} = couch_stream:close(Stream),
- ?_assertEqual(<<"foodfoob">>, read_all(Fd, Ptrs)).
+ {NewEngine, _, _, _, _} = couch_stream:close(Stream),
+ ?_assertEqual(<<"foodfoob">>, read_all(NewEngine)).
should_recall_last_pointer_position({Fd, Stream}) ->
couch_stream:write(Stream, <<"foodfoob">>),
{_, _, _, _, _} = couch_stream:close(Stream),
{ok, ExpPtr} = couch_file:bytes(Fd),
- {ok, Stream2} = couch_stream:open(Fd),
+ {ok, Stream2} = couch_stream:open(?ENGINE),
ZeroBits = <<0:(8 * 10)>>,
OneBits = <<1:(8 * 10)>>,
ok = couch_stream:write(Stream2, OneBits),
ok = couch_stream:write(Stream2, ZeroBits),
- {Ptrs, 20, _, _, _} = couch_stream:close(Stream2),
+ {NewEngine, 20, _, _, _} = couch_stream:close(Stream2),
+ {ok, Ptrs} = couch_stream:to_disk_term(NewEngine),
[{ExpPtr, 20}] = Ptrs,
AllBits = iolist_to_binary([OneBits, ZeroBits]),
- ?_assertEqual(AllBits, read_all(Fd, Ptrs)).
+ ?_assertEqual(AllBits, read_all(NewEngine)).
should_stream_more_with_4K_chunk_size({Fd, _}) ->
- {ok, Stream} = couch_stream:open(Fd, [{buffer_size, 4096}]),
+ {ok, Stream} = couch_stream:open(?ENGINE, [{buffer_size, 4096}]),
lists:foldl(
fun(_, Acc) ->
Data = <<"a1b2c">>,
couch_stream:write(Stream, Data),
[Data | Acc]
end, [], lists:seq(1, 1024)),
- ?_assertMatch({[{0, 4100}, {4106, 1020}], 5120, _, _, _},
- couch_stream:close(Stream)).
+ {NewEngine, Length, _, _, _} = couch_stream:close(Stream),
+ {ok, Ptrs} = couch_stream:to_disk_term(NewEngine),
+ ?_assertMatch({[{0, 4100}, {4106, 1020}], 5120}, {Ptrs, Length}).
should_stop_on_normal_exit_of_stream_opener({Fd, _}) ->
RunnerPid = self(),
OpenerPid = spawn(
fun() ->
- {ok, StreamPid} = couch_stream:open(Fd),
+ {ok, StreamPid} = couch_stream:open(?ENGINE),
RunnerPid ! {pid, StreamPid}
end),
StreamPid = receive
@@ -115,6 +119,6 @@ should_stop_on_normal_exit_of_stream_opener({Fd, _}) ->
?_assertNot(is_process_alive(StreamPid)).
-read_all(Fd, PosList) ->
- Data = couch_stream:foldl(Fd, PosList, fun(Bin, Acc) -> [Bin, Acc] end, []),
+read_all(Engine) ->
+ Data = couch_stream:foldl(Engine, fun(Bin, Acc) -> [Bin, Acc] end, []),
iolist_to_binary(Data).
diff --git a/src/couch/test/couchdb_compaction_daemon_tests.erl b/src/couch/test/couchdb_compaction_daemon_tests.erl
index c2920ed..f691997 100644
--- a/src/couch/test/couchdb_compaction_daemon_tests.erl
+++ b/src/couch/test/couchdb_compaction_daemon_tests.erl
@@ -242,7 +242,7 @@ spawn_compaction_monitor(DbName) ->
1,
couch_db_updater,
handle_cast,
- [{compact_done, '_'}, '_'],
+ [{compact_done, '_', '_'}, '_'],
DbPid,
?TIMEOUT
),
diff --git a/src/couch/test/couchdb_views_tests.erl b/src/couch/test/couchdb_views_tests.erl
index 02e9d72..0baa7c3 100644
--- a/src/couch/test/couchdb_views_tests.erl
+++ b/src/couch/test/couchdb_views_tests.erl
@@ -543,23 +543,27 @@ has_doc(DocId1, Rows) ->
lists:any(fun({R}) -> lists:member({<<"id">>, DocId}, R) end, Rows).
backup_db_file(DbName) ->
- DbDir = config:get("couchdb", "database_dir"),
- DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]),
- {ok, _} = file:copy(DbFile, DbFile ++ ".backup"),
- ok.
+ {ok, Db} = couch_db:open_int(DbName, []),
+ try
+ SrcPath = couch_db:get_filepath(Db),
+ Src = if
+ is_list(SrcPath) -> SrcPath;
+ true -> binary_to_list(SrcPath)
+ end,
+ ok = copy_tree(Src, Src ++ ".backup")
+ after
+ couch_db:close(Db)
+ end.
restore_backup_db_file(DbName) ->
- DbDir = config:get("couchdb", "database_dir"),
-
{ok, Db} = couch_db:open_int(DbName, []),
+ Src = couch_db:get_filepath(Db),
ok = couch_db:close(Db),
DbPid = couch_db:get_pid(Db),
exit(DbPid, shutdown),
- DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]),
- ok = file:delete(DbFile),
- ok = file:rename(DbFile ++ ".backup", DbFile),
- ok.
+ exit(couch_db:get_pid(Db), shutdown),
+ ok = copy_tree(Src ++ ".backup", Src).
compact_db(DbName) ->
{ok, Db} = couch_db:open_int(DbName, []),
@@ -707,3 +711,22 @@ wait_indexer(IndexerPid) ->
ok
end
end).
+
+copy_tree(Src, Dst) ->
+ case filelib:is_dir(Src) of
+ true ->
+ {ok, Files} = file:list_dir(Src),
+ copy_tree(Files, Src, Dst);
+ false ->
+ ok = filelib:ensure_dir(Dst),
+ {ok, _} = file:copy(Src, Dst),
+ ok
+ end.
+
+copy_tree([], _Src, _Dst) ->
+ ok;
+copy_tree([File | Rest], Src, Dst) ->
+ FullSrc = filename:join(Src, File),
+ FullDst = filename:join(Dst, File),
+ ok = copy_tree(FullSrc, FullDst),
+ copy_tree(Rest, Src, Dst).
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index ad48f40..bf31460 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -166,7 +166,7 @@ update(Idx, Mod, IdxState) ->
end
end,
- Proc = fun(DocInfo, _, {IdxStateAcc, _}) ->
+ Proc = fun(DocInfo, {IdxStateAcc, _}) ->
case CommittedOnly and (GetSeq(DocInfo) > DbCommittedSeq) of
true ->
{stop, {IdxStateAcc, false}};
@@ -180,7 +180,7 @@ update(Idx, Mod, IdxState) ->
{ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
Acc0 = {InitIdxState, true},
- {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []),
+ {ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []),
{ProcIdxSt, SendLast} = Acc,
% If we didn't bail due to hitting the last committed seq we need
@@ -198,7 +198,7 @@ update(Idx, Mod, IdxState) ->
purge_index(Db, Mod, IdxState) ->
- DbPurgeSeq = couch_db:get_purge_seq(Db),
+ {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
IdxPurgeSeq = Mod:get(purge_seq, IdxState),
if
DbPurgeSeq == IdxPurgeSeq ->
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 5f23767..49f5485 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -416,8 +416,18 @@ all_docs_fold(Db, #mrargs{keys=undefined}=Args, Callback, UAcc) ->
update_seq=UpdateSeq,
args=Args
},
- [Opts] = couch_mrview_util:all_docs_key_opts(Args),
- {ok, Offset, FinalAcc} = couch_db:enum_docs(Db, fun map_fold/3, Acc, Opts),
+ [Opts1] = couch_mrview_util:all_docs_key_opts(Args),
+ % TODO: This is a terrible hack for now. We'll probably have
+ % to rewrite _all_docs to not be part of mrview and not expect
+ % a btree. For now non-btree's will just have to pass 0 or
+ % some fake reductions to get an offset.
+ Opts2 = [include_reductions | Opts1],
+ FunName = case couch_util:get_value(namespace, Args#mrargs.extra) of
+ <<"_design">> -> fold_design_docs;
+ <<"_local">> -> fold_local_docs;
+ _ -> fold_docs
+ end,
+ {ok, Offset, FinalAcc} = couch_db:FunName(Db, fun map_fold/3, Acc, Opts2),
finish_fold(FinalAcc, [{total, Total}, {offset, Offset}]);
all_docs_fold(Db, #mrargs{direction=Dir, keys=Keys0}=Args, Callback, UAcc) ->
Total = get_total_rows(Db, Args),
@@ -531,17 +541,25 @@ map_fold({{Key, Id}, Val}, _Offset, Acc) ->
user_acc=UAcc1,
last_go=Go
}};
-map_fold({<<"_local/",_/binary>> = DocId, {Rev0, Body}}, _Offset, #mracc{} = Acc) ->
+map_fold(#doc{id = <<"_local/", _/binary>>} = Doc, _Offset, #mracc{} = Acc) ->
#mracc{
limit=Limit,
callback=Callback,
user_acc=UAcc0,
args=Args
} = Acc,
- Rev = {0, list_to_binary(integer_to_list(Rev0))},
- Value = {[{rev, couch_doc:rev_to_str(Rev)}]},
- Doc = if Args#mrargs.include_docs -> [{doc, Body}]; true -> [] end,
- Row = [{id, DocId}, {key, DocId}, {value, Value}] ++ Doc,
+ #doc{
+ id = DocId,
+ revs = {Pos, [RevId | _]}
+ } = Doc,
+ Rev = {Pos, RevId},
+ Row = [
+ {id, DocId},
+ {key, DocId},
+ {value, {[{rev, couch_doc:rev_to_str(Rev)}]}}
+ ] ++ if not Args#mrargs.include_docs -> []; true ->
+ [{doc, couch_doc:to_json_obj(Doc, Args#mrargs.doc_options)}]
+ end,
{Go, UAcc1} = Callback({row, Row}, UAcc0),
{Go, Acc#mracc{
limit=Limit-1,
@@ -653,11 +671,11 @@ make_meta(Args, UpdateSeq, Base) ->
end.
-get_total_rows(#db{local_tree = LocalTree} = Db, #mrargs{extra = Extra}) ->
+get_total_rows(Db, #mrargs{extra = Extra}) ->
case couch_util:get_value(namespace, Extra) of
<<"_local">> ->
- FoldFun = fun(_, _, Acc) -> {ok, Acc + 1} end,
- {ok, _, Total} = couch_btree:foldl(LocalTree, FoldFun, 0),
+ FoldFun = fun(_, Acc) -> {ok, Acc + 1} end,
+ {ok, Total} = couch_db:fold_local_docs(Db, FoldFun, 0, []),
Total;
_ ->
{ok, Info} = couch_db:get_db_info(Db),
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 27f8737..558c162 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -165,7 +165,7 @@ extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) ->
view_sig(Db, State, View, #mrargs{include_docs=true}=Args) ->
BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}),
UpdateSeq = couch_db:get_update_seq(Db),
- PurgeSeq = couch_db:get_purge_seq(Db),
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
#mrst{
seq_indexed=SeqIndexed,
keyseq_indexed=KeySeqIndexed
@@ -199,9 +199,10 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) ->
init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
Header = #mrheader{
seq=0,
- purge_seq=couch_db:get_purge_seq(Db),
+ purge_seq=PurgeSeq,
id_btree_state=nil,
log_btree_state=nil,
view_states=[make_view_state(#mrview{}) || _ <- Views]
@@ -236,7 +237,9 @@ init_state(Db, Fd, State, Header) ->
view_states=ViewStates
} = Header,
- IdBtOpts = [{compression, couch_db:compression(Db)}],
+ IdBtOpts = [
+ {compression, couch_compress:get_compression_method()}
+ ],
{ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts),
{ok, LogBtree} = case SeqIndexed orelse KeySeqIndexed of
true -> couch_btree:open(LogBtreeState, Fd, IdBtOpts);
@@ -256,10 +259,10 @@ init_state(Db, Fd, State, Header) ->
views=Views2
}.
-open_view(Db, Fd, Lang, ViewState, View) ->
+open_view(_Db, Fd, Lang, ViewState, View) ->
ReduceFun = make_reduce_fun(Lang, View#mrview.reduce_funs),
LessFun = maybe_define_less_fun(View),
- Compression = couch_db:compression(Db),
+ Compression = couch_compress:get_compression_method(),
BTState = get_key_btree_state(ViewState),
ViewBtOpts = [
{less, LessFun},
@@ -268,7 +271,7 @@ open_view(Db, Fd, Lang, ViewState, View) ->
],
{ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts),
- BySeqReduceFun = fun couch_db_updater:btree_by_seq_reduce/2,
+ BySeqReduceFun = fun couch_bt_engine:seq_tree_reduce/2,
{ok, SeqBtree} = if View#mrview.seq_indexed ->
SeqBTState = get_seq_btree_state(ViewState),
ViewSeqBtOpts = [{reduce, BySeqReduceFun},
@@ -321,7 +324,7 @@ get_row_count(#mrview{btree=Bt}) ->
all_docs_reduce_to_count(Reductions0) ->
Reductions = maybe_convert_reductions(Reductions0),
- Reduce = fun couch_db_updater:btree_by_id_reduce/2,
+ Reduce = fun couch_bt_engine:id_tree_reduce/2,
{Count, _, _} = couch_btree:final_reduce(Reduce, Reductions),
Count.
diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl
index 5731ff4..3ac35a1 100644
--- a/src/couch_replicator/test/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl
@@ -221,7 +221,7 @@ should_compare_databases(Source, Target) ->
{timeout, 35, ?_test(begin
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, _, Acc) ->
+ Fun = fun(FullDocInfo, Acc) ->
{ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
{Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
DocId = couch_util:get_value(<<"_id">>, Props),
@@ -240,7 +240,7 @@ should_compare_databases(Source, Target) ->
?assertEqual(DocJson, DocTargetJson),
{ok, Acc}
end,
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb)
end)}.
diff --git a/src/couch_replicator/test/couch_replicator_filtered_tests.erl b/src/couch_replicator/test/couch_replicator_filtered_tests.erl
index 03cf44c..d34e9f0 100644
--- a/src/couch_replicator/test/couch_replicator_filtered_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_filtered_tests.erl
@@ -169,7 +169,7 @@ compare_dbs(Source, Target, FilterFun) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, TargetDb} = couch_db:open_int(Target, []),
{ok, TargetDbInfo} = couch_db:get_db_info(TargetDb),
- Fun = fun(FullDocInfo, _, Acc) ->
+ Fun = fun(FullDocInfo, Acc) ->
{ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo),
TargetReply = read_doc(TargetDb, DocId),
case FilterFun(DocId, SourceDoc) of
@@ -181,7 +181,7 @@ compare_dbs(Source, Target, FilterFun) ->
{ok, [ValidReply|Acc]}
end
end,
- {ok, _, AllReplies} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []),
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb),
{ok, TargetDbInfo, AllReplies}.
diff --git a/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl b/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl
index e8ccd64..f8d231d 100644
--- a/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl
@@ -131,9 +131,9 @@ populate_db(DbName) ->
update_db_docs(DbName, Times) ->
{ok, Db} = couch_db:open_int(DbName, []),
- {ok, _, _} = couch_db:enum_docs(
+ {ok, _} = couch_db:fold_docs(
Db,
- fun(FDI, _, Acc) -> db_fold_fun(FDI, Acc) end,
+ fun(FDI, Acc) -> db_fold_fun(FDI, Acc) end,
{DbName, Times},
[]),
ok = couch_db:close(Db).
diff --git a/src/couch_replicator/test/couch_replicator_selector_tests.erl b/src/couch_replicator/test/couch_replicator_selector_tests.erl
index 98c6099..a7f4c5d 100644
--- a/src/couch_replicator/test/couch_replicator_selector_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_selector_tests.erl
@@ -65,7 +65,7 @@ compare_dbs(Source, Target, FilterFun) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, TargetDb} = couch_db:open_int(Target, []),
{ok, TargetDbInfo} = couch_db:get_db_info(TargetDb),
- Fun = fun(FullDocInfo, _, Acc) ->
+ Fun = fun(FullDocInfo, Acc) ->
{ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo),
TargetReply = read_doc(TargetDb, DocId),
case FilterFun(DocId, SourceDoc) of
@@ -77,7 +77,7 @@ compare_dbs(Source, Target, FilterFun) ->
{ok, [ValidReply|Acc]}
end
end,
- {ok, _, AllReplies} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []),
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb),
{ok, TargetDbInfo, AllReplies}.
diff --git a/src/couch_replicator/test/couch_replicator_test_helper.erl b/src/couch_replicator/test/couch_replicator_test_helper.erl
index 398b27b..ae27745 100644
--- a/src/couch_replicator/test/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/couch_replicator_test_helper.erl
@@ -14,7 +14,7 @@ compare_dbs(Source, Target, ExceptIds) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, _, Acc) ->
+ Fun = fun(FullDocInfo, Acc) ->
{ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
Id = DocSource#doc.id,
case lists:member(Id, ExceptIds) of
@@ -27,7 +27,7 @@ compare_dbs(Source, Target, ExceptIds) ->
{ok, Acc}
end,
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb).
diff --git a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
index e04488e..9dea8af 100644
--- a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl
@@ -144,7 +144,7 @@ populate_db(DbName, DocCount) ->
compare_dbs(Source, Target) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, _, Acc) ->
+ Fun = fun(FullDocInfo, Acc) ->
{ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
{Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
DocId = couch_util:get_value(<<"_id">>, Props),
@@ -163,7 +163,7 @@ compare_dbs(Source, Target) ->
?assertEqual(DocJson, DocTargetJson),
{ok, Acc}
end,
- {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
+ {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb).
diff --git a/src/fabric/src/fabric_db_create.erl b/src/fabric/src/fabric_db_create.erl
index a7f4ed9..3685398 100644
--- a/src/fabric/src/fabric_db_create.erl
+++ b/src/fabric/src/fabric_db_create.erl
@@ -28,7 +28,7 @@ go(DbName, Options) ->
{error, file_exists};
false ->
{Shards, Doc} = generate_shard_map(DbName, Options),
- CreateShardResult = create_shard_files(Shards),
+ CreateShardResult = create_shard_files(Shards, Options),
case CreateShardResult of
enametoolong ->
{error, {database_name_too_long, DbName}};
@@ -64,12 +64,12 @@ generate_shard_map(DbName, Options) ->
% the DB already exists, and may have a different Suffix
ok;
{not_found, _} ->
- Doc = make_document(Shards, Suffix)
+ Doc = make_document(Shards, Suffix, Options)
end,
{Shards, Doc}.
-create_shard_files(Shards) ->
- Workers = fabric_util:submit_jobs(Shards, create_db, []),
+create_shard_files(Shards, Options) ->
+ Workers = fabric_util:submit_jobs(Shards, create_db, [Options]),
RexiMon = fabric_util:create_monitors(Shards),
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of
{error, file_exists} ->
@@ -155,7 +155,7 @@ maybe_stop(W, Counters) ->
end
end.
-make_document([#shard{dbname=DbName}|_] = Shards, Suffix) ->
+make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) ->
{RawOut, ByNodeOut, ByRangeOut} =
lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
@@ -164,12 +164,19 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix) ->
{[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
orddict:append(Range, Node, ByRange)}
end, {[], [], []}, Shards),
- #doc{id=DbName, body = {[
- {<<"shard_suffix">>, Suffix},
- {<<"changelog">>, lists:sort(RawOut)},
- {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
- {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
- ]}}.
+ EngineProp = case couch_util:get_value(engine, Options) of
+ E when is_binary(E) -> [{<<"engine">>, E}];
+ _ -> []
+ end,
+ #doc{
+ id = DbName,
+ body = {[
+ {<<"shard_suffix">>, Suffix},
+ {<<"changelog">>, lists:sort(RawOut)},
+ {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
+ {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
+ ] ++ EngineProp}
+ }.
db_exists(DbName) -> is_list(catch mem3:shards(DbName)).
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index c57cc09..e5a9b07 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -71,7 +71,7 @@ changes(DbName, Options, StartVector, DbOptions) ->
{ok, Db} ->
StartSeq = calculate_start_seq(Db, node(), StartVector),
Enum = fun changes_enumerator/2,
- Opts = [{dir,Dir}],
+ Opts = [doc_info, {dir,Dir}],
Acc0 = #cacc{
db = Db,
seq = StartSeq,
@@ -82,7 +82,7 @@ changes(DbName, Options, StartVector, DbOptions) ->
},
try
{ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} =
- couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
+ couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts),
rexi:stream_last({complete, [
{seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}},
{pending, Pending}
@@ -224,7 +224,7 @@ get_missing_revs(DbName, IdRevsList, Options) ->
Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
{ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
case FullDocInfoResult of
- {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
+ #full_doc_info{rev_tree=RevisionTree} = FullInfo ->
MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
{Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
not_found ->
@@ -255,8 +255,7 @@ group_info(DbName, DDocId, DbOptions) ->
reset_validation_funs(DbName) ->
case get_or_create_db(DbName, []) of
{ok, Db} ->
- DbPid = couch_db:get_pid(Db),
- gen_server:cast(DbPid, {load_validation_funs, undefined});
+ couch_db:reload_validation_funs(Db);
_ ->
ok
end.
@@ -340,6 +339,8 @@ reduce_cb(complete, Acc) ->
{ok, Acc}.
+changes_enumerator(#full_doc_info{} = FDI, Acc) ->
+ changes_enumerator(couch_doc:to_doc_info(FDI), Acc);
changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
{ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}};
changes_enumerator(DocInfo, Acc) ->
diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl
index d6ac0be..6579210 100644
--- a/src/mem3/include/mem3.hrl
+++ b/src/mem3/include/mem3.hrl
@@ -16,7 +16,8 @@
node :: node() | '_',
dbname :: binary(),
range :: [non_neg_integer() | '$1' | '$2'] | '_',
- ref :: reference() | 'undefined' | '_'
+ ref :: reference() | 'undefined' | '_',
+ opts :: list()
}).
%% Do not reference outside of mem3.
@@ -26,7 +27,8 @@
dbname :: binary(),
range :: [non_neg_integer() | '$1' | '$2'] | '_',
ref :: reference() | 'undefined' | '_',
- order :: non_neg_integer() | 'undefined' | '_'
+ order :: non_neg_integer() | 'undefined' | '_',
+ opts :: list()
}).
%% types
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index e9c1473..5e218f7 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -23,7 +23,7 @@
-export([get_placement/1]).
%% For mem3 use only.
--export([name/1, node/1, range/1]).
+-export([name/1, node/1, range/1, engine/1]).
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
@@ -99,7 +99,8 @@ shards_int(DbName, Options) ->
name = ShardDbName,
dbname = ShardDbName,
range = [0, (2 bsl 31)-1],
- order = undefined}];
+ order = undefined,
+ opts = []}];
ShardDbName ->
%% shard_db is treated as a single sharded db to support calls to db_info
%% and view_all_docs
@@ -107,7 +108,8 @@ shards_int(DbName, Options) ->
node = node(),
name = ShardDbName,
dbname = ShardDbName,
- range = [0, (2 bsl 31)-1]}];
+ range = [0, (2 bsl 31)-1],
+ opts = []}];
_ ->
mem3_shards:for_db(DbName, Options)
end.
@@ -307,3 +309,15 @@ name(#shard{name=Name}) ->
Name;
name(#ordered_shard{name=Name}) ->
Name.
+
+engine(#shard{opts=Opts}) ->
+ engine(Opts);
+engine(#ordered_shard{opts=Opts}) ->
+ engine(Opts);
+engine(Opts) when is_list(Opts) ->
+ case couch_util:get_value(engine, Opts) of
+ Engine when is_binary(Engine) ->
+ [{engine, Engine}];
+ _ ->
+ []
+ end.
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl
index 555389b..019ceaf 100644
--- a/src/mem3/src/mem3_nodes.erl
+++ b/src/mem3/src/mem3_nodes.erl
@@ -102,8 +102,9 @@ initialize_nodelist() ->
Doc = #doc{id = couch_util:to_binary(node())},
{ok, _} = couch_db:update_doc(Db, Doc, [])
end,
+ Seq = couch_db:get_update_seq(Db),
couch_db:close(Db),
- couch_db:get_update_seq(Db).
+ Seq.
first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
{ok, Acc};
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 85d46e2..44e6c5b 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -21,7 +21,7 @@
]).
-export([
- changes_enumerator/3
+ changes_enumerator/2
]).
@@ -173,8 +173,8 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
repl(Db, Acc0) ->
erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
#acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
- Fun = fun ?MODULE:changes_enumerator/3,
- {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []),
+ Fun = fun ?MODULE:changes_enumerator/2,
+ {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}.
@@ -225,11 +225,10 @@ compare_epochs(Acc) ->
Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
Acc#acc{seq = Seq, history = {[]}}.
-changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
+changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
- changes_enumerator(FDI, Reds, Acc);
-changes_enumerator(#full_doc_info{}=FDI, _,
- #acc{revcount=C, infos=Infos}=Acc0) ->
+ changes_enumerator(FDI, Acc);
+changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
#doc_info{
high_seq=Seq,
revs=Revs
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index f998552..accede9 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -65,19 +65,14 @@ for_docid(DbName, DocId) ->
for_docid(DbName, DocId, Options) ->
HashKey = mem3_util:hash(DocId),
ShardHead = #shard{
- name = '_',
- node = '_',
dbname = DbName,
- range = ['$1','$2'],
- ref = '_'
+ range = ['$1', '$2'],
+ _ = '_'
},
OrderedShardHead = #ordered_shard{
- name = '_',
- node = '_',
dbname = DbName,
- range = ['$1','$2'],
- ref = '_',
- order = '_'
+ range = ['$1', '$2'],
+ _ = '_'
},
Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}],
ShardSpec = {ShardHead, Conditions, ['$_']},
@@ -103,18 +98,13 @@ for_shard_name(ShardName, Options) ->
DbName = mem3:dbname(ShardName),
ShardHead = #shard{
name = ShardName,
- node = '_',
dbname = DbName,
- range = '_',
- ref = '_'
+ _ = '_'
},
OrderedShardHead = #ordered_shard{
name = ShardName,
- node = '_',
dbname = DbName,
- range = '_',
- ref = '_',
- order = '_'
+ _ = '_'
},
ShardSpec = {ShardHead, [], ['$_']},
OrderedShardSpec = {OrderedShardHead, [], ['$_']},
@@ -156,7 +146,7 @@ fold(Fun, Acc) ->
{ok, Db} = mem3_util:ensure_exists(DbName),
FAcc = {Db, Fun, Acc},
try
- {ok, _, LastAcc} = couch_db:enum_docs(Db, fun fold_fun/3, FAcc, []),
+ {ok, LastAcc} = couch_db:fold_docs(Db, fun fold_fun/2, FAcc),
{_Db, _UFun, UAcc} = LastAcc,
UAcc
after
@@ -249,10 +239,10 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
%% internal functions
-fold_fun(#full_doc_info{}=FDI, _, Acc) ->
+fold_fun(#full_doc_info{}=FDI, Acc) ->
DI = couch_doc:to_doc_info(FDI),
- fold_fun(DI, nil, Acc);
-fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
+ fold_fun(DI, Acc);
+fold_fun(#doc_info{}=DI, {Db, UFun, UAcc}) ->
case couch_db:open_doc(Db, DI, [ejson_body, conflicts]) of
{ok, Doc} ->
{Props} = Doc#doc.body,
@@ -266,8 +256,9 @@ fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
get_update_seq() ->
DbName = config:get("mem3", "shards_db", "_dbs"),
{ok, Db} = mem3_util:ensure_exists(DbName),
+ Seq = couch_db:get_update_seq(Db),
couch_db:close(Db),
- couch_db:get_update_seq(Db).
+ Seq.
listen_for_changes(Since) ->
DbName = config:get("mem3", "shards_db", "_dbs"),
@@ -299,7 +290,7 @@ changes_callback({change, {Change}, _}, _) ->
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
- [create_if_missing(mem3:name(S)) || S
+ [create_if_missing(mem3:name(S), mem3:engine(S)) || S
<- Shards, mem3:node(S) =:= node()]
end
end
@@ -336,20 +327,18 @@ in_range(Shard, HashKey) ->
[B, E] = mem3:range(Shard),
B =< HashKey andalso HashKey =< E.
-create_if_missing(Name) ->
- DbDir = config:get("couchdb", "database_dir"),
- Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"),
- case filelib:is_regular(Filename) of
- true ->
- ok;
- false ->
- case couch_server:create(Name, [?ADMIN_CTX]) of
- {ok, Db} ->
- couch_db:close(Db);
- Error ->
- couch_log:error("~p tried to create ~s, got ~p",
- [?MODULE, Name, Error])
- end
+create_if_missing(Name, Options) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ case couch_server:create(Name, [?ADMIN_CTX] ++ Options) of
+ {ok, Db} ->
+ couch_db:close(Db);
+ Error ->
+ couch_log:error("~p tried to create ~s, got ~p",
+ [?MODULE, Name, Error])
+ end
end.
cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 2cd444d..4e1b5fd 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -153,6 +153,10 @@ build_ordered_shards(DbName, DocProps) ->
build_shards_by_node(DbName, DocProps) ->
{ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ EngineOpt = case couch_util:get_value(<<"engine">>, DocProps) of
+ Engine when is_binary(Engine) -> [{engine, Engine}];
+ _ -> []
+ end,
lists:flatmap(fun({Node, Ranges}) ->
lists:map(fun(Range) ->
[B,E] = string:tokens(?b2l(Range), "-"),
@@ -161,7 +165,8 @@ build_shards_by_node(DbName, DocProps) ->
name_shard(#shard{
dbname = DbName,
node = to_atom(Node),
- range = [Beg, End]
+ range = [Beg, End],
+ opts = EngineOpt
}, Suffix)
end, Ranges)
end, ByNode).
@@ -169,6 +174,10 @@ build_shards_by_node(DbName, DocProps) ->
build_shards_by_range(DbName, DocProps) ->
{ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}),
Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ EngineOpt = case couch_util:get_value(<<"engine">>, DocProps) of
+ Engine when is_binary(Engine) -> [{engine, Engine}];
+ _ -> []
+ end,
lists:flatmap(fun({Range, Nodes}) ->
lists:map(fun({Node, Order}) ->
[B,E] = string:tokens(?b2l(Range), "-"),
@@ -178,7 +187,8 @@ build_shards_by_range(DbName, DocProps) ->
dbname = DbName,
node = to_atom(Node),
range = [Beg, End],
- order = Order
+ order = Order,
+ opts = EngineOpt
}, Suffix)
end, lists:zip(Nodes, lists:seq(1, length(Nodes))))
end, ByRange).
@@ -247,7 +257,8 @@ downcast(#ordered_shard{}=S) ->
node = S#ordered_shard.node,
dbname = S#ordered_shard.dbname,
range = S#ordered_shard.range,
- ref = S#ordered_shard.ref
+ ref = S#ordered_shard.ref,
+ opts = S#ordered_shard.opts
};
downcast(Shards) when is_list(Shards) ->
[downcast(Shard) || Shard <- Shards].
diff --git a/src/mem3/test/mem3_util_test.erl b/src/mem3/test/mem3_util_test.erl
index 340a58a..42bc5c7 100644
--- a/src/mem3/test/mem3_util_test.erl
+++ b/src/mem3/test/mem3_util_test.erl
@@ -85,35 +85,35 @@ build_shards_test() ->
[{shard,<<"shards/00000000-1fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[0,536870911],
- undefined},
+ undefined,[]},
{shard,<<"shards/20000000-3fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[536870912,1073741823],
- undefined},
+ undefined,[]},
{shard,<<"shards/40000000-5fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[1073741824,1610612735],
- undefined},
+ undefined,[]},
{shard,<<"shards/60000000-7fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[1610612736,2147483647],
- undefined},
+ undefined,[]},
{shard,<<"shards/80000000-9fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[2147483648,2684354559],
- undefined},
+ undefined,[]},
{shard,<<"shards/a0000000-bfffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[2684354560,3221225471],
- undefined},
+ undefined,[]},
{shard,<<"shards/c0000000-dfffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[3221225472,3758096383],
- undefined},
+ undefined,[]},
{shard,<<"shards/e0000000-ffffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[3758096384,4294967295],
- undefined}],
+ undefined,[]}],
?assertEqual(ExpectedShards1, Shards1),
ok.
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.
[couchdb] 03/03: Ensure deterministic revisions for attachments
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3287-pluggable-storage-engines
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit c340fb0805ecae7aa4b5aaf9a22120ba383cd505
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Feb 8 07:25:37 2017 -0600
Ensure deterministic revisions for attachments
This re-fixes a corner case when recreating a document with an
attachment in a single multipart request. Since we don't detect that we
need a new revision until after the document has been serialized we need
to be able to deserialize the body so that we can generate the same
revisions regardless of the contents of the database. If we don't do
this then we end up including information from the position of the
attachment on disk in the revision calculation which can introduce
branches in the revision tree.
I've left this as a separate commit from the pluggable storage engine
work so that its called out clearly for us to revisit.
COUCHDB-3255
---
src/couch/src/couch_bt_engine.erl | 10 +++++++++-
src/couch/src/couch_db.erl | 12 +-----------
src/couch/src/couch_db_updater.erl | 12 +++++++++++-
3 files changed, 21 insertions(+), 13 deletions(-)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 374199f..0cd9d65 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -324,7 +324,15 @@ serialize_doc(#st{} = St, #doc{} = Doc) ->
SummaryBin = ?term_to_bin({Body, Atts}),
Md5 = couch_crypto:hash(md5, SummaryBin),
Data = couch_file:assemble_file_chunk(SummaryBin, Md5),
- Doc#doc{body = Data}.
+ % TODO: This is a terrible hack to get around the issues
+ % in COUCHDB-3255. We'll need to come back and figure
+ % out a better approach to handling the case when we
+ % need to generate a new revision id after the doc
+ % has been serialized.
+ Doc#doc{
+ body = Data,
+ meta = [{comp_body, Body} | Doc#doc.meta]
+ }.
write_doc_body(St, #doc{} = Doc) ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 7fc22b2..478bd2e 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -944,7 +944,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
-new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
+new_revid(#doc{body=Body, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
DigestedAtts = lists:foldl(fun(Att, Acc) ->
[N, T, M] = couch_att:fetch([name, type, md5], Att),
case M == <<>> of
@@ -952,16 +952,6 @@ new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted})
false -> [{N, T, M} | Acc]
end
end, [], Atts),
- Body = case Body0 of
- {summary, [_Len, _Md5, BodyAtts], _SizeInfo, _AttsFd} ->
- {CompBody, _CompAtts} = binary_to_term(BodyAtts),
- couch_compress:decompress(CompBody);
- {summary, [_Len, BodyAtts], _SizeInfo, _AttsFd} ->
- {CompBody, _CompAtts} = binary_to_term(BodyAtts),
- couch_compress:decompress(CompBody);
- Else ->
- Else
- end,
case DigestedAtts of
Atts2 when length(Atts) =/= length(Atts2) ->
% We must have old style non-md5 attachments
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index d765458..9121a71 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -520,7 +520,17 @@ merge_rev_tree(OldInfo, NewDoc, Client, Limit, false)
% Update the new doc based on revisions in OldInfo
#doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(OldInfo),
#rev_info{rev={OldPos, OldRev}} = WinningRev,
- NewRevId = couch_db:new_revid(NewDoc#doc{revs={OldPos, [OldRev]}}),
+ Body = case couch_util:get_value(comp_body, NewDoc#doc.meta) of
+ CompBody when is_binary(CompBody) ->
+ couch_compress:decompress(CompBody);
+ _ ->
+ NewDoc#doc.body
+ end,
+ RevIdDoc = NewDoc#doc{
+ revs = {OldPos, [OldRev]},
+ body = Body
+ },
+ NewRevId = couch_db:new_revid(RevIdDoc),
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
% Merge our modified new doc into the tree
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.