You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/02/01 17:46:43 UTC
[5/6] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 7f90b57
Implement pluggable storage engines
This change moves the main work of storage engines to run through the
new couch_db_engine behavior. This allows us to replace the storage
engine with different implementations that can be tailored to specific
work loads and environments.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/a7c6713d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/a7c6713d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/a7c6713d
Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: a7c6713d2f4fa763e132bf13b35417cca66b655b
Parents: 60633f5
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 5 12:04:20 2016 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Feb 1 11:42:50 2017 -0600
----------------------------------------------------------------------
include/couch_db.hrl | 38 +-
src/couch_att.erl | 132 ++-
src/couch_auth_cache.erl | 17 +-
src/couch_changes.erl | 25 +-
src/couch_compaction_daemon.erl | 32 +-
src/couch_db.erl | 759 ++++++++-------
src/couch_db_engine.erl | 3 -
src/couch_db_updater.erl | 1275 ++++++-------------------
src/couch_httpd_db.erl | 8 +-
src/couch_httpd_misc_handlers.erl | 13 -
src/couch_lru.erl | 10 +-
src/couch_server.erl | 239 ++++-
src/couch_stream.erl | 256 ++---
src/couch_util.erl | 40 +-
test/couch_stream_tests.erl | 30 +-
test/couchdb_compaction_daemon_tests.erl | 4 +-
test/couchdb_views_tests.erl | 45 +-
17 files changed, 1250 insertions(+), 1676 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/include/couch_db.hrl
----------------------------------------------------------------------
diff --git a/include/couch_db.hrl b/include/couch_db.hrl
index e7cd85d..03d7cc4 100644
--- a/include/couch_db.hrl
+++ b/include/couch_db.hrl
@@ -78,7 +78,8 @@
update_seq = 0,
deleted = false,
rev_tree = [],
- sizes = #size_info{}
+ sizes = #size_info{},
+ meta = []
}).
-record(httpd, {
@@ -129,30 +130,29 @@
}).
-record(db, {
+ name,
+ filepath,
+
+ engine = {couch_bt_engine, undefined},
+
main_pid = nil,
compactor_pid = nil,
- instance_start_time, % number of microsecs since jan 1 1970 as a binary string
- fd,
- fd_monitor,
- header = couch_db_header:new(),
+
committed_update_seq,
- id_tree,
- seq_tree,
- local_tree,
- update_seq,
- name,
- filepath,
- validate_doc_funs = undefined,
- security = [],
- security_ptr = nil,
+
+ instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+
user_ctx = #user_ctx{},
+ security = [],
+ validate_doc_funs = undefined,
+
+ before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
+ after_doc_read = nil, % nil | fun(Doc, Db) -> NewDoc
+
waiting_delayed_commit = nil,
- revs_limit = 1000,
- fsync_options = [],
+
options = [],
- compression,
- before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
- after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc
+ compression
}).
-record(view_fold_helper_funs, {
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_att.erl
----------------------------------------------------------------------
diff --git a/src/couch_att.erl b/src/couch_att.erl
index 9d38cfa..8ea1e45 100644
--- a/src/couch_att.erl
+++ b/src/couch_att.erl
@@ -18,7 +18,8 @@
fetch/2,
store/2,
store/3,
- transform/3
+ transform/3,
+ copy/2
]).
-export([
@@ -233,6 +234,14 @@ transform(Field, Fun, Att) ->
store(Field, NewValue, Att).
+copy(Att, DstStream) ->
+ [{stream, SrcStream}, AttLen, OldMd5] = fetch([data, att_len, md5], Att),
+ ok = couch_stream:copy(SrcStream, DstStream),
+ {NewStream, AttLen, _, NewMd5, _} = couch_stream:close(DstStream),
+ couch_util:check_md5(OldMd5, NewMd5),
+ store(data, {stream, NewStream}, Att).
+
+
is_stub(Att) ->
stub == fetch(data, Att).
@@ -292,11 +301,12 @@ size_info(Atts) ->
%% as safe as possible, avoiding the need for complicated disk versioning
%% schemes.
to_disk_term(#att{} = Att) ->
- {_, StreamIndex} = fetch(data, Att),
+ {stream, StreamEngine} = fetch(data, Att),
+ {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
{
fetch(name, Att),
fetch(type, Att),
- StreamIndex,
+ Sp,
fetch(att_len, Att),
fetch(disk_len, Att),
fetch(revpos, Att),
@@ -309,9 +319,13 @@ to_disk_term(Att) ->
fun
(data, {Props, Values}) ->
case lists:keytake(data, 1, Props) of
- {value, {_, {_Fd, Sp}}, Other} -> {Other, [Sp | Values]};
- {value, {_, Value}, Other} -> {Other, [Value | Values]};
- false -> {Props, [undefined |Values ]}
+ {value, {_, {stream, StreamEngine}}, Other} ->
+ {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
+ {Other, [Sp | Values]};
+ {value, {_, Value}, Other} ->
+ {Other, [Value | Values]};
+ false ->
+ {Props, [undefined |Values ]}
end;
(Key, {Props, Values}) ->
case lists:keytake(Key, 1, Props) of
@@ -332,9 +346,11 @@ to_disk_term(Att) ->
%% compression to remove these sorts of common bits (block level compression
%% with something like a shared dictionary that is checkpointed every now and
%% then).
-from_disk_term(Fd, {Base, Extended}) when is_tuple(Base), is_list(Extended) ->
- store(Extended, from_disk_term(Fd, Base));
-from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+from_disk_term(StreamSrc, {Base, Extended})
+ when is_tuple(Base), is_list(Extended) ->
+ store(Extended, from_disk_term(StreamSrc, Base));
+from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -342,10 +358,11 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
disk_len=DiskLen,
md5=Md5,
revpos=RevPos,
- data={Fd,Sp},
+ data={stream, Stream},
encoding=upgrade_encoding(Enc)
};
-from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
+from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -353,9 +370,10 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
disk_len=AttLen,
md5=Md5,
revpos=RevPos,
- data={Fd,Sp}
+ data={stream, Stream}
};
-from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) ->
+from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) ->
+ {ok, Stream} = open_stream(StreamSrc, Sp),
#att{
name=Name,
type=Type,
@@ -363,7 +381,7 @@ from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) ->
disk_len=AttLen,
md5= <<>>,
revpos=0,
- data={Fd,Sp}
+ data={stream, Stream}
}.
@@ -477,32 +495,18 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
{Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}.
-flush(Fd, Att) ->
- flush_data(Fd, fetch(data, Att), Att).
+flush(Db, Att) ->
+ flush_data(Db, fetch(data, Att), Att).
-flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd ->
- % already written to our file, nothing to write
- Att;
-flush_data(Fd, {OtherFd, StreamPointer}, Att) ->
- [InMd5, InDiskLen] = fetch([md5, disk_len], Att),
- {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
- couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- couch_db:check_md5(IdentityMd5, InMd5),
- store([
- {data, {Fd, NewStreamData}},
- {md5, Md5},
- {att_len, Len},
- {disk_len, InDiskLen}
- ], Att);
-flush_data(Fd, Data, Att) when is_binary(Data) ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+flush_data(Db, Data, Att) when is_binary(Data) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
couch_stream:write(OutputStream, Data)
end);
-flush_data(Fd, Fun, Att) when is_function(Fun) ->
+flush_data(Db, Fun, Att) when is_function(Fun) ->
case fetch(att_len, Att) of
undefined ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
% Fun(MaxChunkSize, WriterFun) must call WriterFun
% once for each chunk of the attachment,
Fun(4096,
@@ -523,11 +527,11 @@ flush_data(Fd, Fun, Att) when is_function(Fun) ->
end, ok)
end);
AttLen ->
- couch_db:with_stream(Fd, Att, fun(OutputStream) ->
+ couch_db:with_stream(Db, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, AttLen)
end)
end;
-flush_data(Fd, {follows, Parser, Ref}, Att) ->
+flush_data(Db, {follows, Parser, Ref}, Att) ->
ParserRef = erlang:monitor(process, Parser),
Fun = fun() ->
Parser ! {get_bytes, Ref, self()},
@@ -541,9 +545,23 @@ flush_data(Fd, {follows, Parser, Ref}, Att) ->
end
end,
try
- flush_data(Fd, Fun, store(data, Fun, Att))
+ flush_data(Db, Fun, store(data, Fun, Att))
after
erlang:demonitor(ParserRef, [flush])
+ end;
+flush_data(Db, {stream, StreamEngine}, Att) ->
+ case couch_db:is_active_stream(Db, StreamEngine) of
+ true ->
+ % Already written
+ Att;
+ false ->
+ NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) ->
+ couch_stream:copy(StreamEngine, OutputStream)
+ end),
+ InMd5 = fetch(md5, Att),
+ OutMd5 = fetch(md5, NewAtt),
+ couch_util:check_md5(OutMd5, InMd5),
+ NewAtt
end.
@@ -572,9 +590,9 @@ foldl(Att, Fun, Acc) ->
foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-foldl({Fd, Sp}, Att, Fun, Acc) ->
+foldl({stream, StreamEngine}, Att, Fun, Acc) ->
Md5 = fetch(md5, Att),
- couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+ couch_stream:foldl(StreamEngine, Md5, Fun, Acc);
foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) ->
Len = fetch(att_len, Att),
fold_streamed_data(DataFun, Len, Fun, Acc);
@@ -599,14 +617,15 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
range_foldl(Att, From, To, Fun, Acc) ->
- {Fd, Sp} = fetch(data, Att),
- couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
+ {stream, StreamEngine} = fetch(data, Att),
+ couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc).
foldl_decode(Att, Fun, Acc) ->
case fetch([data, encoding], Att) of
- [{Fd, Sp}, Enc] ->
- couch_stream:foldl_decode(Fd, Sp, fetch(md5, Att), Enc, Fun, Acc);
+ [{stream, StreamEngine}, Enc] ->
+ couch_stream:foldl_decode(
+ StreamEngine, fetch(md5, Att), Enc, Fun, Acc);
[Fun2, identity] ->
fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc)
end.
@@ -620,7 +639,7 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
Bin;
to_binary(Iolist, _Att) when is_list(Iolist) ->
iolist_to_binary(Iolist);
-to_binary({_Fd,_Sp}, Att) ->
+to_binary({stream, _StreamEngine}, Att) ->
iolist_to_binary(
lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, []))
);
@@ -680,9 +699,25 @@ upgrade_encoding(false) -> identity;
upgrade_encoding(Encoding) -> Encoding.
+open_stream(StreamSrc, Data) ->
+ case couch_db:is_db(StreamSrc) of
+ true ->
+ couch_db:open_read_stream(StreamSrc, Data);
+ false ->
+ case is_function(StreamSrc, 1) of
+ true ->
+ StreamSrc(Data);
+ false ->
+ erlang:error({invalid_stream_source, StreamSrc})
+ end
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
+% Eww...
+-include("couch_bt_engine.hrl").
%% Test utilities
@@ -737,7 +772,7 @@ attachment_disk_term_test_() ->
{disk_len, 0},
{md5, <<212,29,140,217,143,0,178,4,233,128,9,152,236,248,66,126>>},
{revpos, 4},
- {data, {fake_fd, fake_sp}},
+ {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}},
{encoding, identity}
]),
BaseDiskTerm = {
@@ -751,11 +786,14 @@ attachment_disk_term_test_() ->
Headers = [{<<"X-Foo">>, <<"bar">>}],
ExtendedAttachment = store(headers, Headers, BaseAttachment),
ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]},
+ FakeDb = #db{
+ engine = {couch_bt_engine, #st{fd = fake_fd}}
+ },
{"Disk term tests", [
?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)),
- ?_assertEqual(BaseAttachment, from_disk_term(fake_fd, BaseDiskTerm)),
+ ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)),
?_assertEqual(ExtendedDiskTerm, to_disk_term(ExtendedAttachment)),
- ?_assertEqual(ExtendedAttachment, from_disk_term(fake_fd, ExtendedDiskTerm))
+ ?_assertEqual(ExtendedAttachment, from_disk_term(FakeDb, ExtendedDiskTerm))
]}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_auth_cache.erl
----------------------------------------------------------------------
diff --git a/src/couch_auth_cache.erl b/src/couch_auth_cache.erl
index 9b00a9d..6f25665 100644
--- a/src/couch_auth_cache.erl
+++ b/src/couch_auth_cache.erl
@@ -331,15 +331,12 @@ refresh_entries(AuthDb) ->
nil ->
ok;
AuthDb2 ->
- case AuthDb2#db.update_seq > AuthDb#db.update_seq of
+ OldSeq = couch_db:get_update_seq(AuthDb),
+ NewSeq = couch_db:get_update_seq(AuthDb2),
+ case NewSeq > OldSeq of
true ->
- {ok, _, _} = couch_db:enum_docs_since(
- AuthDb2,
- AuthDb#db.update_seq,
- fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end,
- AuthDb#db.update_seq,
- []
- ),
+ Fun = fun(DocInfo, _) -> refresh_entry(AuthDb2, DocInfo) end,
+ {ok, _} = couch_db:fold_changes(AuthDb2, OldSeq, Fun, nil),
true = ets:insert(?STATE, {auth_db, AuthDb2});
false ->
ok
@@ -395,7 +392,9 @@ cache_needs_refresh() ->
nil ->
false;
AuthDb2 ->
- AuthDb2#db.update_seq > AuthDb#db.update_seq
+ OldSeq = couch_db:get_update_seq(AuthDb),
+ NewSeq = couch_db:get_update_seq(AuthDb2),
+ NewSeq > OldSeq
end
end,
false
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index 52ff39d..5779c12 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -219,7 +219,7 @@ configure_filter("_view", Style, Req, Db) ->
catch _:_ ->
view
end,
- case Db#db.id_tree of
+ case Db#db.main_pid of
undefined ->
DIR = fabric_util:doc_id_and_rev(DDoc),
{fetch, FilterType, Style, DIR, VName};
@@ -242,7 +242,7 @@ configure_filter(FilterName, Style, Req, Db) ->
[DName, FName] ->
{ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
check_member_exists(DDoc, [<<"filters">>, FName]),
- case Db#db.id_tree of
+ case Db#db.main_pid of
undefined ->
DIR = fabric_util:doc_id_and_rev(DDoc),
{fetch, custom, Style, Req, DIR, FName};
@@ -395,7 +395,7 @@ check_fields(_Fields) ->
throw({bad_request, "Selector error: fields must be JSON array"}).
-open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) ->
+open_ddoc(#db{name=DbName, main_pid=undefined}, DDocId) ->
case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of
{ok, _} = Resp -> Resp;
Else -> throw(Else)
@@ -531,7 +531,8 @@ send_changes(Acc, Dir, FirstRound) ->
{#mrview{}, {fast_view, _, _, _}} ->
couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
{undefined, _} ->
- couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ Opts = [doc_info, {dir, Dir}],
+ couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
{#mrview{}, _} ->
ViewEnumFun = fun view_changes_enumerator/2,
{Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
@@ -566,20 +567,24 @@ can_optimize(_, _) ->
send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
- Lookups = couch_btree:lookup(Db#db.id_tree, DocIds),
+ Results = couch_db:open_docs(Db, DocIds, [full_doc_info]),
FullInfos = lists:foldl(fun
- ({ok, FDI}, Acc) -> [FDI | Acc];
+ (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
(not_found, Acc) -> Acc
- end, [], Lookups),
+ end, [], Results),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
- FoldFun = fun(FullDocInfo, _, Acc) ->
+ FoldFun = fun(FullDocInfo, Acc) ->
{ok, [FullDocInfo | Acc]}
end,
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts),
+ KeyOpts = [
+ include_deleted,
+ {start_key, <<"_design/">>},
+ {end_key_gt, <<"_design0">>}
+ ],
+ {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_compaction_daemon.erl
----------------------------------------------------------------------
diff --git a/src/couch_compaction_daemon.erl b/src/couch_compaction_daemon.erl
index 8f95eb2..77e3f54 100644
--- a/src/couch_compaction_daemon.erl
+++ b/src/couch_compaction_daemon.erl
@@ -236,17 +236,18 @@ maybe_compact_views(DbName, [DDocName | Rest], Config) ->
db_ddoc_names(Db) ->
- {ok, _, DDocNames} = couch_db:enum_docs(
- Db,
- fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
- {ok, Acc};
- (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
- {ok, [Id | Acc]};
- (_, _, Acc) ->
- {stop, Acc}
- end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
+ FoldFun = fun ddoc_name/2,
+ Opts = [{start_key, <<"_design/">>}],
+ {ok, DDocNames} = couch_db:fold_docs(Db, FoldFun, [], Opts),
DDocNames.
+ddoc_name(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, Acc) ->
+ {ok, Acc};
+ddoc_name(#full_doc_info{id = <<"_design/", Id/binary>>}, Acc) ->
+ {ok, [Id | Acc]};
+ddoc_name(_, Acc) ->
+ {stop, Acc}.
+
maybe_compact_view(DbName, GroupId, Config) ->
DDocId = <<"_design/", GroupId/binary>>,
@@ -391,21 +392,22 @@ check_frag(Threshold, Frag) ->
frag(Props) ->
- FileSize = couch_util:get_value(disk_size, Props),
+ {Sizes} = couch_util:get_value(sizes, Props),
+ FileSize = couch_util:get_value(file, Sizes),
MinFileSize = list_to_integer(
config:get("compaction_daemon", "min_file_size", "131072")),
case FileSize < MinFileSize of
true ->
{0, FileSize};
false ->
- case couch_util:get_value(data_size, Props) of
- null ->
- {100, FileSize};
+ case couch_util:get_value(active, Sizes) of
0 ->
{0, FileSize};
- DataSize ->
+ DataSize when is_integer(DataSize), DataSize > 0 ->
Frag = round(((FileSize - DataSize) / FileSize * 100)),
- {Frag, space_required(DataSize)}
+ {Frag, space_required(DataSize)};
+ _ ->
+ {100, FileSize}
end
end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index 8005e6d..25843d8 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -13,30 +13,40 @@
-module(couch_db).
-export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
--export([start_compact/1, cancel_compact/1]).
+-export([get_path/1]).
+-export([shutdown/1]).
+-export([incref/1]).
+-export([start_compact/1, start_compact/2, cancel_compact/1]).
-export([wait_for_compaction/1, wait_for_compaction/2]).
--export([is_idle/1,monitor/1,count_changes_since/2]).
+-export([is_idle/1,monitor/1,pid/1,compactor_pid/1]).
-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]).
--export([open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([open_doc/2,open_doc/3,open_doc_revs/4, open_docs/2, open_docs/3]).
-export([set_revs_limit/2,get_revs_limit/1]).
+-export([get_user_ctx/1, set_user_ctx/2]).
-export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
-export([get_uuid/1, get_epochs/1, get_compacted_seq/1]).
--export([enum_docs/4,enum_docs_since/5]).
--export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
--export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
+-export([get_instance_start_time/1]).
+-export([get_purge_seq/1,purge_docs/2,get_last_purged/1]).
+-export([start_link/4,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
+-export([fold_docs/3, fold_docs/4]).
+-export([fold_changes/4, fold_changes/5, count_changes_since/2]).
-export([set_security/2,get_security/1]).
--export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
+-export([read_doc/2,new_revid/1]).
-export([check_is_admin/1, is_admin/1, check_is_member/1, get_doc_count/1]).
--export([reopen/1, is_system_db/1, compression/1, make_doc/5]).
--export([load_validation_funs/1]).
--export([check_md5/2, with_stream/3]).
+-export([reopen/1, is_system_db/1, make_doc/5]).
+-export([load_validation_funs/1, reload_validation_funs/1]).
+-export([with_stream/3]).
-export([monitored_by/1]).
-export([normalize_dbname/1]).
-export([validate_dbname/1]).
-export([dbname_suffix/1]).
+
+-export([is_db/1]).
+-export([open_write_stream/2, open_read_stream/2, is_active_stream/2]).
+-export([get_before_doc_update/1, get_after_doc_read/1]).
+
-include_lib("couch/include/couch_db.hrl").
-define(DBNAME_REGEX,
@@ -44,38 +54,9 @@
"(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
).
-start_link(DbName, Filepath, Options) ->
- case open_db_file(Filepath, Options) of
- {ok, Fd} ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
- Filepath, Fd, Options}, []),
- unlink(Fd),
- gen_server:call(UpdaterPid, get_db);
- Else ->
- Else
- end.
-
-open_db_file(Filepath, Options) ->
- case couch_file:open(Filepath, Options) of
- {ok, Fd} ->
- {ok, Fd};
- {error, enoent} ->
- % couldn't find file. is there a compact version? This can happen if
- % crashed during the file switch.
- case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
- {ok, Fd} ->
- couch_log:info("Found ~s~s compaction file, using as primary"
- " storage.", [Filepath, ".compact"]),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
- {ok, Fd};
- {error, enoent} ->
- {not_found, no_db_file}
- end;
- Error ->
- Error
- end.
-
+start_link(Engine, DbName, Filepath, Options) ->
+ Arg = {Engine, DbName, Filepath, Options},
+ proc_lib:start_link(couch_db_updater, init, [Arg]).
create(DbName, Options) ->
couch_server:create(DbName, Options).
@@ -87,7 +68,7 @@ open_int(DbName, Options) ->
% this should be called anytime an http request opens the database.
% it ensures that the http userCtx is a valid reader
-open(DbName, Options) ->
+open(DbName, Options) when is_binary(DbName) ->
case couch_server:open(DbName, Options) of
{ok, Db} ->
try
@@ -98,23 +79,49 @@ open(DbName, Options) ->
close(Db),
throw(Error)
end;
- Else -> Else
- end.
+ Else ->
+ Else
+ end;
+open(DbName, Options) ->
+ open(iolist_to_binary(DbName), Options).
-reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
- {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
- case NewFd =:= Fd of
- true ->
- {ok, NewDb#db{user_ctx = UserCtx}};
- false ->
- erlang:demonitor(OldRef, [flush]),
- NewRef = erlang:monitor(process, NewFd),
- {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
- end.
+
+reopen(#db{} = Db) ->
+ % We could have just swapped out the storage engine
+ % for this database during a compaction so we just
+ % reimplement this as a close/open pair now.
+ close(Db),
+ open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]).
+
+
+% You shouldn't call this. Its part of the ref counting between
+% couch_server and couch_db instances.
+incref(#db{} = Db) ->
+ couch_db_engine:incref(Db).
+
+
+close(#db{} = Db) ->
+ ok = couch_db_engine:decref(Db).
+
+shutdown(#db{} = Db) ->
+ couch_util:shutdown_sync(Db#db.main_pid).
+
+is_db(#db{}) -> true;
+is_db(_Else) -> false.
+
+
+pid(#db{} = Db) ->
+ Db#db.main_pid.
+
+compactor_pid(#db{} = Db) ->
+ Db#db.compactor_pid.
is_system_db(#db{options = Options}) ->
lists:member(sys_db, Options).
+get_path(#db{filepath = Path}) ->
+ Path.
+
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
{ok, StartTime}.
@@ -124,30 +131,35 @@ ensure_full_commit(Db, RequiredSeq) ->
ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
-close(#db{fd_monitor=Ref}) ->
- erlang:demonitor(Ref, [flush]),
- ok.
-
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
monitored_by(Db) == [];
is_idle(_Db) ->
false.
monitored_by(Db) ->
- case erlang:process_info(Db#db.fd, monitored_by) of
- undefined ->
- [];
- {monitored_by, Pids} ->
- PidTracker = whereis(couch_stats_process_tracker),
- Pids -- [Db#db.main_pid, PidTracker]
+ case couch_db_engine:monitored_by(Db) of
+ Pids when is_list(Pids) ->
+ PidTracker = whereis(couch_stats_process_tracker),
+ Pids -- [Db#db.main_pid, PidTracker];
+ undefined ->
+ []
end.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-start_compact(#db{main_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
+start_compact(#db{} = Db) ->
+ start_compact(Db, []).
+
+start_compact(#db{} = Db, Opts) ->
+ case lists:keyfind(notify, 1, Opts) of
+ {notify, Pid, Term} ->
+ Db#db.main_pid ! {'$gen_call', {Pid, Term}, start_compact},
+ ok;
+ _ ->
+ gen_server:call(Db#db.main_pid, start_compact)
+ end.
cancel_compact(#db{main_pid=Pid}) ->
gen_server:call(Pid, cancel_compact).
@@ -198,6 +210,14 @@ open_doc(Db, Id, Options) ->
apply_open_options(Else,Options)
end.
+open_docs(Db, Id) ->
+ open_docs(Db, Id, []).
+
+open_docs(Db, Ids, _Options) ->
+ % TODO: Add support for returning other
+ % types of docs beyond #full_doc_info{}
+ couch_db_engine:open_docs(Db, Ids).
+
apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
@@ -247,7 +267,8 @@ get_missing_revs(Db, IdRevsList) ->
find_missing([], []) ->
[];
-find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo])
+ when is_record(FullInfo, full_doc_info) ->
case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
[] ->
find_missing(RestIdRevs, RestLookupInfo);
@@ -275,8 +296,8 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
- {ok, DocInfo} ->
- {ok, couch_doc:to_doc_info(DocInfo)};
+ #full_doc_info{} = FDI ->
+ {ok, couch_doc:to_doc_info(FDI)};
Else ->
Else
end.
@@ -287,10 +308,7 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.id_tree, Ids).
-
-increment_update_seq(#db{main_pid=Pid}) ->
- gen_server:call(Pid, increment_update_seq).
+ couch_db_engine:open_docs(Db, Ids).
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
@@ -298,56 +316,56 @@ purge_docs(#db{main_pid=Pid}, IdsRevs) ->
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
-get_update_seq(#db{update_seq=Seq})->
- Seq.
+get_user_ctx(#db{} = Db) ->
+ Db#db.user_ctx.
+
+set_user_ctx(#db{} = Db, #user_ctx{} = UserCtx) ->
+ {ok, Db#db{user_ctx = UserCtx}}.
+
+get_update_seq(#db{} = Db)->
+ couch_db_engine:get(Db, update_seq).
get_purge_seq(#db{}=Db) ->
- couch_db_header:purge_seq(Db#db.header).
+ {ok, couch_db_engine:get(Db, purge_seq)}.
get_last_purged(#db{}=Db) ->
- case couch_db_header:purged_docs(Db#db.header) of
- nil ->
- {ok, []};
- Pointer ->
- couch_file:pread_term(Db#db.fd, Pointer)
- end.
+ {ok, couch_db_engine:get(Db, last_purged)}.
get_doc_count(Db) ->
- {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
- {ok, Count}.
+ {ok, couch_db_engine:get(Db, doc_count)}.
+
+get_del_doc_count(Db) ->
+ {ok, couch_db_engine:get(Db, del_doc_count)}.
get_uuid(#db{}=Db) ->
- couch_db_header:uuid(Db#db.header).
+ couch_db_engine:get(Db, uuid).
get_epochs(#db{}=Db) ->
- couch_db_header:epochs(Db#db.header).
+ couch_db_engine:get(Db, epochs).
get_compacted_seq(#db{}=Db) ->
- couch_db_header:compacted_seq(Db#db.header).
+ couch_db_engine:get(Db, compacted_seq).
+
+get_instance_start_time(#db{}=Db) ->
+ Db#db.instance_start_time.
+
+get_before_doc_update(#db{} = Db) ->
+ Db#db.before_doc_update.
+
+get_after_doc_read(#db{} = Db) ->
+ Db#db.after_doc_read.
get_db_info(Db) ->
- #db{fd=Fd,
- header=Header,
- compactor_pid=Compactor,
- update_seq=SeqNum,
- name=Name,
- instance_start_time=StartTime,
- committed_update_seq=CommittedUpdateSeq,
- id_tree = IdBtree
+ #db{
+ name = Name,
+ compactor_pid = Compactor,
+ instance_start_time = StartTime,
+ committed_update_seq=CommittedUpdateSeq
} = Db,
- {ok, FileSize} = couch_file:bytes(Fd),
- {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
- SizeInfo0 = element(3, DbReduction),
- SizeInfo = case SizeInfo0 of
- SI when is_record(SI, size_info) ->
- SI;
- {AS, ES} ->
- #size_info{active=AS, external=ES};
- AS ->
- #size_info{active=AS}
- end,
- ActiveSize = active_size(Db, SizeInfo),
- DiskVersion = couch_db_header:disk_version(Header),
+ {ok, DocCount} = get_doc_count(Db),
+ {ok, DelDocCount} = get_del_doc_count(Db),
+ SizeInfo = couch_db_engine:get(Db, size_info),
+ DiskVersion = couch_db_engine:get(Db, disk_version),
Uuid = case get_uuid(Db) of
undefined -> null;
Uuid0 -> Uuid0
@@ -358,63 +376,33 @@ get_db_info(Db) ->
end,
InfoList = [
{db_name, Name},
- {doc_count, element(1, DbReduction)},
- {doc_del_count, element(2, DbReduction)},
- {update_seq, SeqNum},
- {purge_seq, couch_db:get_purge_seq(Db)},
- {compact_running, Compactor/=nil},
- {disk_size, FileSize}, % legacy
- {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy
- {data_size, ActiveSize}, % legacy
- {sizes, {[
- {file, FileSize},
- {active, ActiveSize},
- {external, SizeInfo#size_info.external}
- ]}},
+ {engine, couch_db_engine:get(Db, engine)},
+ {doc_count, DocCount},
+ {doc_del_count, DelDocCount},
+ {update_seq, get_update_seq(Db)},
+ {purge_seq, couch_db_engine:get(Db, purge_seq)},
+ {compact_running, Compactor /= nil},
+ {sizes, {SizeInfo}},
{instance_start_time, StartTime},
{disk_format_version, DiskVersion},
{committed_update_seq, CommittedUpdateSeq},
{compacted_seq, CompactedSeq},
{uuid, Uuid}
- ],
+ ],
{ok, InfoList}.
-active_size(#db{}=Db, Size) when is_integer(Size) ->
- active_size(Db, #size_info{active=Size});
-active_size(#db{}=Db, #size_info{}=SI) ->
- Trees = [
- Db#db.id_tree,
- Db#db.seq_tree,
- Db#db.local_tree
- ],
- lists:foldl(fun(T, Acc) ->
- case couch_btree:size(T) of
- _ when Acc == null ->
- null;
- undefined ->
- null;
- Size ->
- Acc + Size
- end
- end, SI#size_info.active, Trees).
+
get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
end;
-get_design_docs(#db{id_tree = IdBtree}) ->
- FoldFun = pipe([fun skip_deleted/4], fun
- (#full_doc_info{deleted = true}, _Reds, Acc) ->
- {ok, Acc};
- (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
- {ok, [FullDocInfo | Acc]};
- (_, _Reds, Acc) ->
- {stop, Acc}
- end),
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
- {ok, Docs}.
+get_design_docs(#db{} = Db) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
+ {ok, lists:reverse(Docs)}.
+
check_is_admin(#db{user_ctx=UserCtx}=Db) ->
@@ -539,8 +527,8 @@ validate_names_and_roles({Props}) when is_list(Props) ->
end,
ok.
-get_revs_limit(#db{revs_limit=Limit}) ->
- Limit.
+get_revs_limit(#db{} = Db) ->
+ couch_db_engine:get(Db, revs_limit).
set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
@@ -548,12 +536,10 @@ set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
set_revs_limit(_Db, _Limit) ->
throw(invalid_revs_limit).
+
name(#db{name=Name}) ->
Name.
-compression(#db{compression=Compression}) ->
- Compression.
-
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
@@ -683,6 +669,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) ->
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs.
+reload_validation_funs(#db{} = Db) ->
+ gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}).
+
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
case Revs of
@@ -749,7 +738,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
- [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([
@@ -800,13 +789,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
- {ok, #full_doc_info{rev_tree=OldTree}} ->
+ #full_doc_info{rev_tree=OldTree} ->
+ RevsLimit = get_revs_limit(Db),
OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc), Db#db.revs_limit),
+ couch_doc:to_path(NewDoc), RevsLimit),
NewTree
end,
OldTree, Bucket),
@@ -942,7 +932,7 @@ update_docs(Db, Docs0, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc))
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -996,8 +986,8 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
+ doc_flush_atts(Db, set_new_att_revpos(
+ check_dup_atts(Doc)))
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -1081,7 +1071,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [
- [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
+ [doc_flush_atts(Db2, Doc) || Doc <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
@@ -1100,18 +1090,24 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
prepare_doc_summaries(Db, BucketList) ->
[lists:map(
- fun(#doc{body = Body, atts = Atts} = Doc) ->
+ fun(#doc{atts = Atts} = Doc0) ->
DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts],
{ok, SizeInfo} = couch_att:size_info(Atts),
- AttsFd = case Atts of
- [Att | _] ->
- {Fd, _} = couch_att:fetch(data, Att),
- Fd;
- [] ->
- nil
+ AttsStream = case Atts of
+ [Att | _] ->
+ {stream, StreamEngine} = couch_att:fetch(data, Att),
+ StreamEngine;
+ [] ->
+ nil
end,
- SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
- Doc#doc{body = {summary, SummaryChunk, SizeInfo, AttsFd}}
+ Doc1 = Doc0#doc{
+ atts = DiskAtts,
+ meta = [
+ {size_info, SizeInfo},
+ {atts_stream, AttsStream}
+ ] ++ Doc0#doc.meta
+ },
+ couch_db_engine:serialize_doc(Db, Doc1)
end,
Bucket) || Bucket <- BucketList].
@@ -1136,12 +1132,8 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) ->
Doc#doc{atts = Atts}.
-doc_flush_atts(Doc, Fd) ->
- Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}.
-
-check_md5(_NewSig, <<>>) -> ok;
-check_md5(Sig, Sig) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
+doc_flush_atts(Db, Doc) ->
+ Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}.
compressible_att_type(MimeType) when is_binary(MimeType) ->
@@ -1171,21 +1163,24 @@ compressible_att_type(MimeType) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, Att, Fun) ->
+with_stream(Db, Att, Fun) ->
[InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att),
BufferSize = list_to_integer(
config:get("couchdb", "attachment_stream_buffer_size", "4096")),
- {ok, OutputStream} = case (Enc =:= identity) andalso
- compressible_att_type(Type) of
- true ->
- CompLevel = list_to_integer(
- config:get("attachments", "compression_level", "0")
- ),
- couch_stream:open(Fd, [{buffer_size, BufferSize},
- {encoding, gzip}, {compression_level, CompLevel}]);
- _ ->
- couch_stream:open(Fd, [{buffer_size, BufferSize}])
+ Options = case (Enc =:= identity) andalso compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ config:get("attachments", "compression_level", "0")
+ ),
+ [
+ {buffer_size, BufferSize},
+ {encoding, gzip},
+ {compression_level, CompLevel}
+ ];
+ _ ->
+ [{buffer_size, BufferSize}]
end,
+ {ok, OutputStream} = open_write_stream(Db, Options),
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
@@ -1195,9 +1190,9 @@ with_stream(Fd, Att, Fun) ->
_ ->
InMd5
end,
- {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
- check_md5(IdentityMd5, ReqMd5),
+ couch_util:check_md5(IdentityMd5, ReqMd5),
{AttLen, DiskLen, NewEnc} = case Enc of
identity ->
case {Md5, IdentityMd5} of
@@ -1219,7 +1214,7 @@ with_stream(Fd, Att, Fun) ->
end
end,
couch_att:store([
- {data, {Fd,StreamInfo}},
+ {data, {stream, StreamEngine}},
{att_len, AttLen},
{disk_len, DiskLen},
{md5, Md5},
@@ -1227,89 +1222,101 @@ with_stream(Fd, Att, Fun) ->
], Att).
-enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(
- fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+open_write_stream(Db, Options) ->
+ couch_db_engine:open_write_stream(Db, Options).
-enum_docs_reduce_to_count(Reds) ->
- FinalRed = couch_btree:final_reduce(
- fun couch_db_updater:btree_by_id_reduce/2, Reds),
- element(1, FinalRed).
-changes_since(Db, StartSeq, Fun, Acc) ->
- changes_since(Db, StartSeq, Fun, [], Acc).
+open_read_stream(Db, AttState) ->
+ couch_db_engine:open_read_stream(Db, AttState).
-changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
- changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc);
-changes_since(SeqTree, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
- DocInfo = case FullDocInfo of
- #full_doc_info{} ->
- couch_doc:to_doc_info(FullDocInfo);
- #doc_info{} ->
- FullDocInfo
- end,
- Fun(DocInfo, Acc2)
- end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
- {ok, AccOut}.
-count_changes_since(Db, SinceSeq) ->
- BTree = Db#db.seq_tree,
- {ok, Changes} =
- couch_btree:fold_reduce(BTree,
- fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(BTree, PartialReds)}
- end,
- 0, [{start_key, SinceSeq + 1}]),
- Changes.
-
-enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(
- Db#db.seq_tree, InFun, Acc,
- [{start_key, SinceSeq + 1} | Options]),
- {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
-
-enum_docs(Db, InFun, InAcc, Options0) ->
- {NS, Options} = extract_namespace(Options0),
- enum_docs(Db, NS, InFun, InAcc, Options).
-
-enum_docs(Db, undefined, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc};
-enum_docs(Db, <<"_local">>, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, _LastReduce, OutAcc} = couch_btree:fold(
- Db#db.local_tree, FoldFun, InAcc, Options),
- {ok, 0, OutAcc};
-enum_docs(Db, NS, InFun, InAcc, Options0) ->
- FoldFun = pipe([
- fun skip_deleted/4,
- stop_on_leaving_namespace(NS)], InFun),
- Options = set_namespace_range(Options0, NS),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
-
-extract_namespace(Options0) ->
- case proplists:split(Options0, [namespace]) of
- {[[{namespace, NS}]], Options} ->
- {NS, Options};
- {_, Options} ->
- {undefined, Options}
+is_active_stream(Db, StreamEngine) ->
+ couch_db_engine:is_active_stream(Db, StreamEngine).
+
+
+fold_docs(Db, Fun, Acc) ->
+ fold_docs(Db, Fun, Acc, []).
+
+
+fold_docs(Db, UserFun, UserAcc, Options) ->
+ case lists:keyfind(namespace, 1, Options) of
+ {namespace, <<"_design">>} ->
+ fold_design_docs(Db, UserFun, UserAcc, Options);
+ {namespace, <<"_local">>} ->
+ fold_local_docs(Db, UserFun, UserAcc, Options);
+ _Else ->
+ fold_all_docs(Db, UserFun, UserAcc, Options)
end.
+
+fold_changes(Db, StartSeq, Fun, Acc) ->
+ fold_changes(Db, StartSeq, Fun, Acc, []).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
+ Fun = get_doc_type_conv(Opts),
+ Acc1 = {Db, UserFun, UserAcc},
+ {ok, Acc2} = couch_db_engine:fold_changes(Db, StartSeq, Fun, Acc1, Opts),
+ {_, _, FinalUserAcc} = Acc2,
+ {ok, FinalUserAcc}.
+
+
+count_changes_since(Db, SinceSeq) ->
+ couch_db_engine:count_changes_since(Db, SinceSeq).
+
+
%%% Internal function %%%
+
+fold_all_docs(Db, UserFun, UserAcc, Options) ->
+ % FIXME: THIS IS A HUGE HACK
+ % We'll have to figure out a different implementation
+ % for the _all_docs handler which is the only thing that
+ % uses include_reductions.
+ case lists:member(include_reductions, Options) of
+ true ->
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options);
+ false ->
+ Fun = get_doc_type_conv(Options),
+ Acc1 = {Db, UserFun, UserAcc},
+ {ok, Acc2} = couch_db_engine:fold_docs(Db, Fun, Acc1, Options),
+ {_, _, FinalUserAcc} = Acc2,
+ {ok, FinalUserAcc}
+ end.
+
+
+fold_design_docs(Db, UserFun, UserAcc, Options1) ->
+ Options2 = set_design_doc_keys(Options1),
+
+ % FIXME: Same as above. couch_mrview is doing
+ % terribleness here.
+ case lists:member(include_reductions, Options2) of
+ true ->
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2);
+ false ->
+ Fun1 = get_doc_type_conv(Options1),
+ Fun2 = fun only_ddoc_fold/2,
+ Acc1 = {Fun1, {Db, UserFun, UserAcc}},
+ {ok, Acc2} = couch_db_engine:fold_docs(Db, Fun2, Acc1, Options2),
+ {_, {_, _, FinalUserAcc}} = Acc2,
+ {ok, FinalUserAcc}
+ end.
+
+
+fold_local_docs(Db, UserFun, UserAcc, Options) ->
+ Fun = get_doc_type_conv(Options),
+ Acc1 = {Fun, {Db, UserFun, UserAcc}},
+ {ok, Acc2} = couch_db_engine:fold_local_docs(Db, Fun, Acc1, Options),
+ {_, {_, _, FinalUserAcc}} = Acc2,
+ {ok, FinalUserAcc}.
+
+
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
- {ok, #full_doc_info{rev_tree=RevTree}} ->
+ #full_doc_info{rev_tree=RevTree} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
@@ -1343,9 +1350,8 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
- case couch_btree:lookup(Db#db.local_tree, [Id]) of
- [{ok, {_, {Rev, BodyData}}}] ->
- Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
+ case couch_db_engine:open_local_docs(Db, [Id]) of
+ [#doc{} = Doc] ->
apply_open_options({ok, Doc}, Options);
[not_found] ->
{not_found, missing}
@@ -1364,7 +1370,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
- {ok, FullDocInfo} ->
+ #full_doc_info{} = FullDocInfo ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
{not_found, missing}
@@ -1410,8 +1416,8 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
-read_doc(#db{fd=Fd}, Pos) ->
- couch_file:pread_term(Fd, Pos).
+read_doc(#db{} = Db, Ptr) ->
+ couch_db_engine:read_doc(Db, Ptr).
make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
@@ -1422,34 +1428,31 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
atts = [],
deleted = Deleted
};
-make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) ->
- {BodyData, Atts0} = case Bp of
- nil ->
- {[], []};
- _ ->
- case read_doc(Db, Bp) of
- {ok, {BodyData0, Atts1}} when is_binary(Atts1) ->
- {BodyData0, couch_compress:decompress(Atts1)};
- {ok, {BodyData0, Atts1}} when is_list(Atts1) ->
- % pre 1.2 format
- {BodyData0, Atts1}
- end
- end,
- Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0],
- Doc = #doc{
+make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
+ Doc0 = couch_db_engine:read_doc_body(Db, #doc{
id = Id,
revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
- body = BodyData,
- atts = Atts,
+ body = Bp,
deleted = Deleted
- },
- after_doc_read(Db, Doc).
+ }),
+ Doc1 = case Doc0#doc.atts of
+ BinAtts when is_binary(BinAtts) ->
+ Doc0#doc{
+ atts = couch_compress:decompress(BinAtts)
+ };
+ ListAtts when is_list(ListAtts) ->
+ Doc0
+ end,
+ after_doc_read(Db, Doc1#doc{
+ atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts]
+ }).
after_doc_read(#db{} = Db, Doc) ->
DocWithBody = couch_doc:with_ejson_body(Doc),
couch_db_plugin:after_doc_read(Db, DocWithBody).
+
increment_stat(#db{options = Options}, Stat) ->
case lists:member(sys_db, Options) of
true ->
@@ -1458,71 +1461,6 @@ increment_stat(#db{options = Options}, Stat) ->
couch_stats:increment_counter(Stat)
end.
-skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 ->
- {skip, LK, Reds, Acc};
-skip_deleted(Case, A, B, C) ->
- {Case, A, B, C}.
-
-stop_on_leaving_namespace(NS) ->
- fun
- (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) ->
- case has_prefix(Key, NS) of
- true ->
- {visit, FullInfo, Reds, Acc};
- false ->
- {stop, FullInfo, Reds, Acc}
- end;
- (Case, KV, Reds, Acc) ->
- {Case, KV, Reds, Acc}
- end.
-
-has_prefix(Bin, Prefix) ->
- S = byte_size(Prefix),
- case Bin of
- <<Prefix:S/binary, "/", _/binary>> ->
- true;
- _Else ->
- false
- end.
-
-pipe(Filters, Final) ->
- Wrap =
- fun
- (visit, KV, Reds, Acc) ->
- Final(KV, Reds, Acc);
- (skip, _KV, _Reds, Acc) ->
- {skip, Acc};
- (stop, _KV, _Reds, Acc) ->
- {stop, Acc};
- (traverse, _, _, Acc) ->
- {ok, Acc}
- end,
- do_pipe(Filters, Wrap).
-
-do_pipe([], Fun) -> Fun;
-do_pipe([Filter|Rest], F0) ->
- F1 = fun(C0, KV0, Reds0, Acc0) ->
- {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0),
- F0(C, KV, Reds, Acc)
- end,
- do_pipe(Rest, F1).
-
-set_namespace_range(Options, undefined) -> Options;
-set_namespace_range(Options, NS) ->
- %% FIXME depending on order we might need to swap keys
- SK = select_gt(
- proplists:get_value(start_key, Options, <<"">>),
- <<NS/binary, "/">>),
- EK = select_lt(
- proplists:get_value(end_key, Options, <<NS/binary, "0">>),
- <<NS/binary, "0">>),
- [{start_key, SK}, {end_key_gt, EK}].
-
-select_gt(V1, V2) when V1 < V2 -> V2;
-select_gt(V1, _V2) -> V1.
-
-select_lt(V1, V2) when V1 > V2 -> V2;
-select_lt(V1, _V2) -> V1.
-spec normalize_dbname(list() | binary()) -> binary().
@@ -1562,6 +1500,117 @@ is_systemdb(DbName) when is_list(DbName) ->
is_systemdb(DbName) when is_binary(DbName) ->
lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES).
+
+get_doc_type_conv(Options) ->
+ lists:foldl(fun(Opt, Acc) ->
+ case Opt of
+ doc ->
+ fun conv_to_doc/2;
+ doc_info ->
+ fun conv_to_doc_info/2;
+ full_doc_info ->
+ fun conv_to_full_doc_info/2;
+ _ ->
+ Acc
+ end
+ end, fun conv_to_full_doc_info/2, Options).
+
+
+conv_to_doc(#full_doc_info{}=FDI, {Db, UserFun, UserAcc}) ->
+ #full_doc_info{
+ id = Id,
+ rev_tree = RevTree
+ } = FDI,
+ #doc_info{
+ revs = [#rev_info{deleted = IsDeleted, rev = Rev, body_sp = Bp} | _]
+ } = couch_doc:to_doc_info(FDI),
+ {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
+ {Go, NewUserAcc} = UserFun(Doc, UserAcc),
+ {Go, {Db, UserFun, NewUserAcc}}.
+
+
+conv_to_doc_info(#full_doc_info{} = FDI, {Db, UserFun, UserAcc}) ->
+ DocInfo = couch_doc:to_doc_info(FDI),
+ {Go, NewUserAcc} = UserFun(DocInfo, UserAcc),
+ {Go, {Db, UserFun, NewUserAcc}}.
+
+
+conv_to_full_doc_info(#full_doc_info{} = FDI, {Db, UserFun, UserAcc}) ->
+ {Go, NewUserAcc} = UserFun(FDI, UserAcc),
+ {Go, {Db, UserFun, NewUserAcc}}.
+
+
+set_design_doc_keys(Options1) ->
+ Dir = case lists:keyfind(dir, 1, Options1) of
+ {dir, D0} -> D0;
+ _ -> fwd
+ end,
+ Options2 = set_design_doc_start_key(Options1, Dir),
+ set_design_doc_end_key(Options2, Dir).
+
+
+-define(FIRST_DDOC_KEY, <<"_design/">>).
+-define(LAST_DDOC_KEY, <<"_design0">>).
+
+
+set_design_doc_start_key(Options, fwd) ->
+ Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2});
+set_design_doc_start_key(Options, rev) ->
+ Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2}).
+
+
+set_design_doc_end_key(Options, fwd) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end;
+set_design_doc_end_key(Options, rev) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end.
+
+
+only_ddoc_fold(#full_doc_info{id = <<"_design/", _/binary>>}=FDI, {Fun, Acc}) ->
+ {Go, NewAcc} = Fun(FDI, Acc),
+ {Go, {Fun, NewAcc}};
+only_ddoc_fold(_, _) ->
+ erlang:error(invalid_design_doc_fold).
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a7c6713d/src/couch_db_engine.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_engine.erl b/src/couch_db_engine.erl
index d9ccd18..f22c66f 100644
--- a/src/couch_db_engine.erl
+++ b/src/couch_db_engine.erl
@@ -475,9 +475,6 @@
{ok, CompactedDbHandle::db_handle()}.
--include("couch_db_int.hrl").
-
-
-export([
exists/2,
delete/4,