You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/01 19:06:39 UTC
[2/5] couch commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 09a4de6
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 1970b78..cbd2e79 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -14,74 +14,37 @@
-behaviour(gen_server).
-vsn(1).
--export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]).
--export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]).
--export([make_doc_summary/2]).
+-export([add_sizes/3, upgrade_sizes/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_db_int.hrl").
--record(comp_header, {
- db_header,
- meta_state
-}).
--record(merge_st, {
- id_tree,
- seq_tree,
- curr,
- rem_seqs,
- infos
-}).
-init({DbName, Filepath, Fd, Options}) ->
+init({Engine, DbName, FilePath, Options0}) ->
erlang:put(io_priority, {db_update, DbName}),
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = couch_db_header:new(),
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath ++ ".compact"),
- couch_file:delete(RootDir, Filepath ++ ".compact.data"),
- couch_file:delete(RootDir, Filepath ++ ".compact.meta");
- false ->
- case couch_file:read_header(Fd) of
- {ok, Header} ->
- ok;
- no_valid_header ->
- % create a new header and writes it to the file
- Header = couch_db_header:new(),
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact"),
- file:delete(Filepath ++ ".compact.data"),
- file:delete(Filepath ++ ".compact.meta")
- end
- end,
- Db = init_db(DbName, Filepath, Fd, Header, Options),
- case lists:member(sys_db, Options) of
- false ->
- couch_stats_process_tracker:track([couchdb, open_databases]);
- true ->
- ok
- end,
- % we don't load validation funs here because the fabric query is liable to
- % race conditions. Instead see couch_db:validate_doc_update, which loads
- % them lazily
- {ok, Db#db{main_pid = self()}}.
+ DefaultSecObj = default_security_object(DbName),
+ Options = [{default_security_object, DefaultSecObj} | Options0],
+ try
+ {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
+ Db = init_db(DbName, FilePath, EngineState, Options),
+ maybe_track_db(Db),
+ % we don't load validation funs here because the fabric query is liable to
+ % race conditions. Instead see couch_db:validate_doc_update, which loads
+ % them lazily
+ NewDb = Db#db{main_pid = self()},
+ proc_lib:init_ack({ok, NewDb}),
+ gen_server:enter_loop(?MODULE, [], NewDb)
+ catch
+ throw:InitError ->
+ proc_lib:init_ack(InitError)
+ end.
-terminate(_Reason, Db) ->
- % If the reason we died is because our fd disappeared
- % then we don't need to try closing it again.
- if Db#db.fd_monitor == closed -> ok; true ->
- ok = couch_file:close(Db#db.fd)
- end,
+terminate(Reason, Db) ->
couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd),
+ couch_db_engine:terminate(Reason, Db),
ok.
handle_call(get_db, _From, Db) ->
@@ -105,28 +68,21 @@ handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
unlink(Pid),
exit(Pid, kill),
- RootDir = config:get("couchdb", "database_dir", "."),
- ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
+ couch_server:delete_compaction_files(Db#db.name),
Db2 = Db#db{compactor_pid = nil},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
-handle_call(increment_update_seq, _From, Db) ->
- Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_event:notify(Db#db.name, updated),
- {reply, {ok, Db2#db.update_seq}, Db2};
-handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
- {ok, Ptr, _} = couch_file:append_term(
- Db#db.fd, NewSec, [{compression, Comp}]),
- Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
+handle_call({set_security, NewSec}, _From, #db{} = Db) ->
+ {ok, NewDb} = couch_db_engine:set_security(Db, NewSec),
+ NewSecDb = NewDb#db{
+ security = NewSec
+ },
+ ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity),
+ {reply, ok, NewSecDb};
handle_call({set_revs_limit, Limit}, _From, Db) ->
- Db2 = commit_data(Db#db{revs_limit=Limit,
- update_seq=Db#db.update_seq+1}),
+ {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
@@ -134,73 +90,78 @@ handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
- #db{
- fd = Fd,
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- header = Header,
- compression = Comp
- } = Db,
- DocLookups = couch_btree:lookup(DocInfoByIdBTree,
- [Id || {Id, _Revs} <- IdRevs]),
-
- NewDocInfos = lists:zipwith(
- fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+ DocIds = [Id || {Id, _Revs} <- IdRevs],
+ OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
+
+ NewDocInfos = lists:flatmap(fun
+ ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, []=_RemovedRevs} -> % no change
- nil;
- {NewTree, RemovedRevs} ->
- {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+ {_, [] = _RemovedRevs} -> % no change
+ [];
+ {NewTree, RemovedRevs} ->
+ NewFDI = FDI#full_doc_info{rev_tree = NewTree},
+ [{FDI, NewFDI, RemovedRevs}]
end;
- (_, not_found) ->
- nil
+ ({_, not_found}) ->
+ []
+ end, lists:zip(IdRevs, OldDocInfos)),
+
+ InitUpdateSeq = couch_db_engine:get_update_seq(Db),
+ InitAcc = {InitUpdateSeq, [], []},
+ FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
+ #full_doc_info{
+ id = Id,
+ rev_tree = OldTree
+ } = OldFDI,
+ {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
+
+ {NewFDIAcc, NewSeqAcc} = case OldTree of
+ [] ->
+ % If we purged every #leaf{} in the doc record
+ % then we're removing it completely from the
+ % database.
+ FDIAcc;
+ _ ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the update_seq
+ % sequence. Rather than do a bunch of complicated checks
+ % we just re-label every #leaf{} and reinsert it into
+ % the update_seq sequence.
+ {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, InnerSeqAcc) ->
+ {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
+ (_RevId, Value, _Type, InnerSeqAcc) ->
+ {Value, InnerSeqAcc}
+ end, SeqAcc0, OldTree),
+
+ NewFDI = OldFDI#full_doc_info{
+ update_seq = SeqAcc1,
+ rev_tree = NewTree
+ },
+
+ {[NewFDI | FDIAcc], SeqAcc1}
end,
- IdRevs, DocLookups),
-
- SeqsToRemove = [Seq
- || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
-
- FullDocInfoToUpdate = [FullInfo
- || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
- <- NewDocInfos, Tree /= []],
-
- IdRevsPurged = [{Id, Revs}
- || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
-
- {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
- fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
- Tree2 = couch_key_tree:map_leafs(
- fun(_RevId, Leaf) ->
- Leaf#leaf{seq=SeqAcc+1}
- end, Tree),
- {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1}
- end, LastSeq, FullDocInfoToUpdate),
-
- IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
- <- NewDocInfos],
-
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
- DocInfoToUpdate, SeqsToRemove),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
- FullDocInfoToUpdate, IdsToRemove),
- {ok, Pointer, _} = couch_file:append_term(
- Fd, IdRevsPurged, [{compression, Comp}]),
-
- NewHeader = couch_db_header:set(Header, [
- {purge_seq, couch_db_header:purge_seq(Header) + 1},
- {purged_docs, Pointer}
- ]),
- Db2 = commit_data(
- Db#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq + 1,
- header=NewHeader}),
+ NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
+ {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
+ end, InitAcc, NewDocInfos),
+
+ {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
+
+ % We need to only use the list of #full_doc_info{} records
+ % that we have actually changed due to a purge.
+ PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
+ Pairs = pair_purge_info(PreviousFDIs, FDIs),
+
+ {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
- {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}.
+
+ PurgeSeq = couch_db_engine:get_purge_seq(Db2),
+ {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2};
+
+handle_call(Msg, From, Db) ->
+ couch_db_engine:handle_call(Msg, From, Db).
handle_cast({load_validation_funs, ValidationFuns}, Db) ->
@@ -209,65 +170,29 @@ handle_cast({load_validation_funs, ValidationFuns}, Db) ->
{noreply, Db2};
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
- nil ->
- couch_log:info("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
- _ ->
- % compact currently running, this is a no-op
- {noreply, Db}
- end;
-handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader0} = couch_file:read_header(NewFd),
- NewHeader = couch_db_header:set(NewHeader0, [
- {compacted_seq, Db#db.update_seq}
- ]),
- #db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
- unlink(NewFd),
- case Db#db.update_seq == NewSeq of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
-
- NewDb2 = commit_data(NewDb#db{
- local_tree = NewLocalBtree,
- main_pid = self(),
- filepath = Filepath,
- instance_start_time = Db#db.instance_start_time,
- revs_limit = Db#db.revs_limit
- }),
-
- couch_log:debug("CouchDB swapping files ~s and ~s.",
- [Filepath, CompactFilepath]),
- ok = file:rename(CompactFilepath, Filepath ++ ".compact"),
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- % Delete the old meta compaction file after promoting
- % the compaction file.
- couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
- close_db(Db),
- NewDb3 = refresh_validate_doc_funs(NewDb2),
- ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
- couch_event:notify(NewDb3#db.name, compacted),
- couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb3#db{compactor_pid=nil}};
- false ->
- couch_log:info("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- close_db(NewDb),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
+ nil ->
+ % For now we only support compacting to the same
+ % storage engine. After the first round of patches
+ % we'll add a field that sets the target engine
+ % type to compact to with a new copy compactor.
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ Args = [Db#db.name, UpdateSeq],
+ couch_log:info("Starting compaction for db \"~s\" at ~p", Args),
+ {ok, Db2} = couch_db_engine:start_compaction(Db),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
+ _ ->
+ % compact currently running, this is a no-op
+ {noreply, Db}
end;
+handle_cast({compact_done, CompactEngine, CompactInfo}, #db{} = OldDb) ->
+ {ok, NewDb} = case couch_db_engine:get_engine(OldDb) of
+ CompactEngine ->
+ couch_db_engine:finish_compaction(OldDb, CompactInfo);
+ _ ->
+ finish_engine_swap(OldDb, CompactEngine, CompactInfo)
+ end,
+ {noreply, NewDb};
handle_cast(Msg, #db{name = Name} = Db) ->
couch_log:error("Database `~s` updater received unexpected cast: ~p",
@@ -291,9 +216,9 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
FullCommit2) of
{ok, Db2, UpdatedDDocIds} ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_event:notify(Db2#db.name, updated);
- true -> ok
+ case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
+ {Seq, Seq} -> ok;
+ _ -> couch_event:notify(Db2#db.name, updated)
end,
if NonRepDocs2 /= [] ->
couch_event:notify(Db2#db.name, local_updated);
@@ -336,9 +261,8 @@ handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db};
handle_info({'EXIT', _Pid, Reason}, Db) ->
{stop, Reason, Db};
-handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
- couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]),
- {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}.
+handle_info(Msg, Db) ->
+ couch_db_engine:handle_info(Msg, Db).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -389,235 +313,32 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
{GroupedDocsAcc, ClientsAcc, FullCommit}
end.
-rev_tree(DiskTree) ->
- couch_key_tree:map(fun
- (_RevId, {Del, Ptr, Seq}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq
- };
- (_RevId, {Del, Ptr, Seq, Size}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq,
- sizes = upgrade_sizes(Size)
- };
- (_RevId, {Del, Ptr, Seq, Sizes, Atts}) ->
- #leaf{
- deleted = ?i2b(Del),
- ptr = Ptr,
- seq = Seq,
- sizes = upgrade_sizes(Sizes),
- atts = Atts
- };
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING
- end, DiskTree).
-
-disk_tree(RevTree) ->
- couch_key_tree:map(fun
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING;
- (_RevId, #leaf{} = Leaf) ->
- #leaf{
- deleted = Del,
- ptr = Ptr,
- seq = Seq,
- sizes = Sizes,
- atts = Atts
- } = Leaf,
- {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts}
- end, RevTree).
-upgrade_sizes(#size_info{}=SI) ->
- SI;
-upgrade_sizes({D, E}) ->
- #size_info{active=D, external=E};
-upgrade_sizes(S) when is_integer(S) ->
- #size_info{active=S, external=0}.
-
-split_sizes(#size_info{}=SI) ->
- {SI#size_info.active, SI#size_info.external}.
-
-join_sizes({Active, External}) when is_integer(Active), is_integer(External) ->
- #size_info{active=Active, external=External}.
-
-btree_by_seq_split(#full_doc_info{}=Info) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = Del,
- sizes = SizeInfo,
- rev_tree = Tree
- } = Info,
- {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}.
-
-btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
- btree_by_seq_join(Seq, {Id, Del, {0, 0}, DiskTree});
-btree_by_seq_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = ?i2b(Del),
- sizes = join_sizes(Sizes),
- rev_tree = rev_tree(DiskTree)
- };
-btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
- % Older versions stored #doc_info records in the seq_tree.
- % Compact to upgrade.
- #doc_info{
- id = Id,
- high_seq=KeySeq,
- revs =
- [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
- {Rev, Seq, Bp} <- RevInfos] ++
- [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
- {Rev, Seq, Bp} <- DeletedRevInfos]}.
-
-btree_by_id_split(#full_doc_info{}=Info) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = Deleted,
- sizes = SizeInfo,
- rev_tree = Tree
- } = Info,
- {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}.
-
-% Handle old formats before data_size was added
-btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
- btree_by_id_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree});
-
-btree_by_id_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) ->
- #full_doc_info{
- id = Id,
- update_seq = HighSeq,
- deleted = ?i2b(Deleted),
- sizes = upgrade_sizes(Sizes),
- rev_tree = rev_tree(DiskTree)
- }.
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- lists:foldl(
- fun(Info, {NotDeleted, Deleted, Sizes}) ->
- Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes),
- case Info#full_doc_info.deleted of
- true ->
- {NotDeleted, Deleted + 1, Sizes2};
- false ->
- {NotDeleted + 1, Deleted, Sizes2}
- end
- end,
- {0, 0, #size_info{}}, FullDocInfos);
-btree_by_id_reduce(rereduce, Reds) ->
- lists:foldl(
- fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) ->
- % pre 1.2 format, will be upgraded on compaction
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
- ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) ->
- AccSizes2 = reduce_sizes(AccSizes, Sizes),
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2}
- end,
- {0, 0, #size_info{}}, Reds).
-
-reduce_sizes(nil, _) ->
- nil;
-reduce_sizes(_, nil) ->
- nil;
-reduce_sizes(#size_info{}=S1, #size_info{}=S2) ->
- #size_info{
- active = S1#size_info.active + S2#size_info.active,
- external = S1#size_info.external + S2#size_info.external
- };
-reduce_sizes(S1, S2) ->
- reduce_sizes(upgrade_sizes(S1), upgrade_sizes(S2)).
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-init_db(DbName, Filepath, Fd, Header0, Options) ->
- Header = couch_db_header:upgrade(Header0),
-
- {ok, FsyncOptions} = couch_util:parse_term(
- config:get("couchdb", "fsync_options",
- "[before_header, after_header, on_file_open]")),
-
- case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
- _ -> ok
- end,
-
- Compression = couch_compress:get_compression_method(),
-
- IdTreeState = couch_db_header:id_tree_state(Header),
- SeqTreeState = couch_db_header:seq_tree_state(Header),
- LocalTreeState = couch_db_header:local_tree_state(Header),
- {ok, IdBtree} = couch_btree:open(IdTreeState, Fd,
- [{split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2},
- {compression, Compression}]),
- {ok, SeqBtree} = couch_btree:open(SeqTreeState, Fd,
- [{split, fun ?MODULE:btree_by_seq_split/1},
- {join, fun ?MODULE:btree_by_seq_join/2},
- {reduce, fun ?MODULE:btree_by_seq_reduce/2},
- {compression, Compression}]),
- {ok, LocalDocsBtree} = couch_btree:open(LocalTreeState, Fd,
- [{compression, Compression}]),
- case couch_db_header:security_ptr(Header) of
- nil ->
- Security = default_security_object(DbName),
- SecurityPtr = nil;
- SecurityPtr ->
- {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
- end,
+init_db(DbName, FilePath, EngineState, Options) ->
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = os:timestamp(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- ok = couch_file:set_db_pid(Fd, self()),
- Db = #db{
- fd=Fd,
- fd_monitor = erlang:monitor(process, Fd),
- header=Header,
- id_tree = IdBtree,
- seq_tree = SeqBtree,
- local_tree = LocalDocsBtree,
- committed_update_seq = couch_db_header:update_seq(Header),
- update_seq = couch_db_header:update_seq(Header),
+
+ BDU = couch_util:get_value(before_doc_update, Options, nil),
+ ADR = couch_util:get_value(after_doc_read, Options, nil),
+
+ CleanedOpts = [Opt || Opt <- Options, Opt /= create],
+
+ InitDb = #db{
name = DbName,
- filepath = Filepath,
- security = Security,
- security_ptr = SecurityPtr,
+ filepath = FilePath,
+ engine = EngineState,
instance_start_time = StartTime,
- revs_limit = couch_db_header:revs_limit(Header),
- fsync_options = FsyncOptions,
- options = Options,
- compression = Compression,
- before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
- after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
+ options = CleanedOpts,
+ before_doc_update = BDU,
+ after_doc_read = ADR
},
- % If we just created a new UUID while upgrading a
- % database then we want to flush that to disk or
- % we risk sending out the uuid and having the db
- % crash which would result in it generating a new
- % uuid each time it was reopened.
- case Header /= Header0 of
- true ->
- sync_header(Db, Header);
- false ->
- Db
- end.
-
-
-close_db(#db{fd_monitor = Ref}) ->
- erlang:demonitor(Ref).
+ InitDb#db{
+ committed_update_seq = couch_db_engine:get_update_seq(InitDb),
+ security = couch_db_engine:get_security(InitDb)
+ }.
refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
@@ -641,50 +362,36 @@ refresh_validate_doc_funs(Db0) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd = Fd} = Db,
+flush_trees(#db{} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
{Flushed, FinalAcc} = couch_key_tree:mapfold(
fun(_Rev, Value, Type, SizesAcc) ->
case Value of
- #doc{deleted = IsDeleted, body = {summary, _, _, _} = DocSummary} ->
- {summary, Summary, AttSizeInfo, AttsFd} = DocSummary,
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- case {AttsFd, Fd} of
- {nil, _} ->
- ok;
- {SameFd, SameFd} ->
- ok;
- _ ->
- % Fd where the attachments were written to is not the same
- % as our Fd. This can happen when a database is being
- % switched out during a compaction.
- couch_log:debug("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- ExternalSize = ?term_size(Summary),
- {ok, NewSummaryPointer, SummarySize} =
- couch_file:append_raw_chunk(Fd, Summary),
- Leaf = #leaf{
- deleted = IsDeleted,
- ptr = NewSummaryPointer,
- seq = UpdateSeq,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
+ % This node is a document summary that needs to be
+ % flushed to disk.
+ #doc{} = Doc ->
+ check_doc_atts(Db, Doc),
+ ExternalSize = ?term_size(Doc#doc.body),
+ {size_info, AttSizeInfo} =
+ lists:keyfind(size_info, 1, Doc#doc.meta),
+ {ok, NewDoc, WrittenSize} =
+ couch_db_engine:write_doc_body(Db, Doc),
+ Leaf = #leaf{
+ deleted = Doc#doc.deleted,
+ ptr = NewDoc#doc.body,
+ seq = UpdateSeq,
+ sizes = #size_info{
+ active = WrittenSize,
+ external = ExternalSize
+ },
+ atts = AttSizeInfo
},
- atts = AttSizeInfo
- },
- {Leaf, add_sizes(Type, Leaf, SizesAcc)};
- #leaf{} ->
- {Value, add_sizes(Type, Value, SizesAcc)};
- _ ->
- {Value, SizesAcc}
+ {Leaf, add_sizes(Type, Leaf, SizesAcc)};
+ #leaf{} ->
+ {Value, add_sizes(Type, Value, SizesAcc)};
+ _ ->
+ {Value, SizesAcc}
end
end, {0, 0, []}, Unflushed),
{FinalAS, FinalES, FinalAtts} = FinalAcc,
@@ -698,6 +405,29 @@ flush_trees(#db{fd = Fd} = Db,
},
flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]).
+
+check_doc_atts(Db, Doc) ->
+ {atts_stream, Stream} = lists:keyfind(atts_stream, 1, Doc#doc.meta),
+ % Make sure that the attachments were written to the currently
+ % active attachment stream. If compaction swaps during a write
+ % request we may have to rewrite our attachment bodies.
+ if Stream == nil -> ok; true ->
+ case couch_db:is_active_stream(Db, Stream) of
+ true ->
+ ok;
+ false ->
+ % Stream where the attachments were written to is
+ % no longer the current attachment stream. This
+ % can happen when a database is switched at
+ % compaction time.
+ couch_log:debug("Stream where the attachments were"
+ " written has changed."
+ " Possibly retrying.", []),
+ throw(retry)
+ end
+ end.
+
+
add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
% Maybe upgrade from disk_size only
#size_info{
@@ -710,6 +440,15 @@ add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
NewAttsAcc = lists:umerge(AttSizes, AttsAcc),
{NewASAcc, NewESAcc, NewAttsAcc}.
+
+upgrade_sizes(#size_info{}=SI) ->
+ SI;
+upgrade_sizes({D, E}) ->
+ #size_info{active=D, external=E};
+upgrade_sizes(S) when is_integer(S) ->
+ #size_info{active=S, external=0}.
+
+
send_result(Client, Doc, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
@@ -836,58 +575,40 @@ merge_rev_tree(OldInfo, NewDoc, _Client, Limit, true) ->
{NewTree, _} = couch_key_tree:merge(OldTree, NewTree0, Limit),
OldInfo#full_doc_info{rev_tree = NewTree}.
-stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
- [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
- #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ RevsLimit = couch_db_engine:get_revs_limit(Db),
-update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
- #db{
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- revs_limit = RevsLimit
- } = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
+ OldDocLookups = couch_db_engine:open_docs(Db, Ids),
+ OldDocInfos = lists:zipwith(fun
+ (_Id, #full_doc_info{} = FDI) ->
+ FDI;
(Id, not_found) ->
#full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
+ end, Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
- MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
-
- % All documents are now ready to write.
-
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit,
+ MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
- {ok, IndexFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
-
- % and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs),
+ {ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []),
+ Pairs = pair_write_info(OldDocLookups, IndexFDIs),
+ LocalDocs2 = update_local_doc_revs(LocalDocs),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
- WriteCount = length(IndexFullDocInfos),
+ WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
- WriteCount - length(RemoveSeqs)),
+ WriteCount - length(RemSeqs)),
couch_stats:increment_counter([couchdb, document_writes], WriteCount),
couch_stats:increment_counter(
[couchdb, local_document_writes],
- length(NonRepDocs)
+ length(LocalDocs2)
),
- Db3 = Db2#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq},
-
% Check if we just updated any design documents, and update the validation
% funs if we did.
UpdatedDDocIds = lists:flatmap(fun
@@ -895,549 +616,92 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
(_) -> []
end, Ids),
- Db4 = case length(UpdatedDDocIds) > 0 of
+ Db2 = case length(UpdatedDDocIds) > 0 of
true ->
- couch_event:notify(Db3#db.name, ddoc_updated),
- ddoc_cache:evict(Db3#db.name, UpdatedDDocIds),
- refresh_validate_doc_funs(Db3);
+ ddoc_cache:evict(Db1#db.name, UpdatedDDocIds),
+ refresh_validate_doc_funs(Db1);
false ->
- Db3
+ Db1
end,
- {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
-
-update_local_docs(Db, []) ->
- {ok, Db};
-update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
- BtreeEntries = lists:map(
- fun({Client, NewDoc}) ->
- #doc{
- id = Id,
- deleted = Delete,
- revs = {0, PrevRevs},
- body = Body
- } = NewDoc,
- case PrevRevs of
- [RevStr|_] ->
+ {ok, commit_data(Db2, not FullCommit), UpdatedDDocIds}.
+
+
+update_local_doc_revs(Docs) ->
+ lists:map(fun({Client, NewDoc}) ->
+ #doc{
+ deleted = Delete,
+ revs = {0, PrevRevs}
+ } = NewDoc,
+ case PrevRevs of
+ [RevStr | _] ->
PrevRev = list_to_integer(?b2l(RevStr));
[] ->
PrevRev = 0
- end,
- case Delete of
- false ->
- send_result(Client, NewDoc, {ok,
- {0, ?l2b(integer_to_list(PrevRev + 1))}}),
- {update, {Id, {PrevRev + 1, Body}}};
- true ->
- send_result(Client, NewDoc,
- {ok, {0, <<"0">>}}),
- {remove, Id}
- end
- end, Docs),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_tree = Btree2}}.
+ end,
+ NewRev = case Delete of
+ false ->
+ ?l2b(integer_to_list(PrevRev + 1));
+ true ->
+ <<"0">>
+ end,
+ send_result(Client, NewDoc, {ok, {0, NewRev}}),
+ NewDoc#doc{
+ revs = {0, [NewRev]}
+ }
+ end, Docs).
-db_to_header(Db, Header) ->
- couch_db_header:set(Header, [
- {update_seq, Db#db.update_seq},
- {seq_tree_state, couch_btree:get_state(Db#db.seq_tree)},
- {id_tree_state, couch_btree:get_state(Db#db.id_tree)},
- {local_tree_state, couch_btree:get_state(Db#db.local_tree)},
- {security_ptr, Db#db.security_ptr},
- {revs_limit, Db#db.revs_limit}
- ]).
commit_data(Db) ->
commit_data(Db, false).
-commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
- TRef = erlang:send_after(1000,self(),delayed_commit),
- Db#db{waiting_delayed_commit=TRef};
+commit_data(#db{waiting_delayed_commit = nil} = Db, true) ->
+ TRef = erlang:send_after(1000, self(), delayed_commit),
+ Db#db{waiting_delayed_commit = TRef};
commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
- header = OldHeader,
- waiting_delayed_commit = Timer
- } = Db,
- if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
- case db_to_header(Db, OldHeader) of
- OldHeader -> Db#db{waiting_delayed_commit=nil};
- NewHeader -> sync_header(Db, NewHeader)
- end.
-
-sync_header(Db, NewHeader) ->
- #db{
- fd = Fd,
- filepath = FilePath,
- fsync_options = FsyncOptions,
waiting_delayed_commit = Timer
} = Db,
-
if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
-
- Before = lists:member(before_header, FsyncOptions),
- After = lists:member(after_header, FsyncOptions),
-
- if Before -> couch_file:sync(FilePath); true -> ok end,
- ok = couch_file:write_header(Fd, NewHeader),
- if After -> couch_file:sync(FilePath); true -> ok end,
-
- Db#db{
- header=NewHeader,
- committed_update_seq=Db#db.update_seq,
- waiting_delayed_commit=nil
+ {ok, Db1} = couch_db_engine:commit_data(Db),
+ Db1#db{
+ waiting_delayed_commit = nil,
+ committed_update_seq = couch_db_engine:get_update_seq(Db)
}.
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
- BinInfos = case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
- {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
-
-merge_lookups(Infos, []) ->
- Infos;
-merge_lookups([], _) ->
- [];
-merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
- % Assert we've matched our lookups
- if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
- erlang:error({mismatched_doc_infos, DI#doc_info.id})
- end,
- [FDI | merge_lookups(RestInfos, RestLookups)];
-merge_lookups([FDI | RestInfos], Lookups) ->
- [FDI | merge_lookups(RestInfos, Lookups)].
-
-check_md5(Md5, Md5) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
-
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
- A =< B
- end, merge_lookups(MixedInfos, LookupResults)),
-
- NewInfos1 = lists:map(fun(Info) ->
- {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
- (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
- {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
- SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
- ExternalSize = ?term_size(SummaryChunk),
- {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
- DestFd, SummaryChunk),
- AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
- NewLeaf = Leaf#leaf{
- ptr = Pos,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
- },
- atts = AttSizes
- },
- {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
- (_Rev, _Leaf, branch, SizesAcc) ->
- {?REV_MISSING, SizesAcc}
- end, {0, 0, []}, Info#full_doc_info.rev_tree),
- {FinalAS, FinalES, FinalAtts} = FinalAcc,
- TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
- NewActiveSize = FinalAS + TotalAttSize,
- NewExternalSize = FinalES + TotalAttSize,
- Info#full_doc_info{
- rev_tree = NewRevTree,
- sizes = #size_info{
- active = NewActiveSize,
- external = NewExternalSize
- }
- }
- end, NewInfos0),
-
- NewInfos = stem_full_doc_infos(Db, NewInfos1),
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")),
- CheckpointAfter = couch_util:to_integer(
- config:get("database_compaction", "checkpoint_after",
- BufferSize * 10)),
-
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
+maybe_track_db(#db{options = Options}) ->
+ case lists:member(sys_db, Options) of
true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, Db#db.name},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
-
-start_copy_compact(#db{}=Db) ->
- erlang:put(io_priority, {db_compact, Db#db.name}),
- #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
- couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
-
- {ok, NewDb, DName, DFd, MFd, Retry} =
- open_compaction_files(Name, Header, Filepath, Options),
- erlang:monitor(process, MFd),
-
- % This is a bit worrisome. init_db/4 will monitor the data fd
- % but it doesn't know about the meta fd. For now I'll maintain
- % that the data fd is the old normal fd and meta fd is special
- % and hope everything works out for the best.
- unlink(DFd),
-
- NewDb1 = copy_purge_info(Db, NewDb),
- NewDb2 = copy_compact(Db, NewDb1, Retry),
- NewDb3 = sort_meta_data(NewDb2),
- NewDb4 = commit_compaction_data(NewDb3),
- NewDb5 = copy_meta_data(NewDb4),
- NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
- close_db(NewDb6),
-
- ok = couch_file:close(MFd),
- gen_server:cast(Db#db.main_pid, {compact_done, DName}).
-
-
-open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
- DataFile = DbFilePath ++ ".compact.data",
- MetaFile = DbFilePath ++ ".compact.meta",
- {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
- DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
- case {DataHdr, MetaHdr} of
- {#comp_header{}=A, #comp_header{}=A} ->
- DbHeader = A#comp_header.db_header,
- Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ when DataHdrIsDbHdr ->
- ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
- Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ ->
- Header = couch_db_header:from(SrcHdr),
- ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, nil}
- end.
-
-
-open_compaction_file(FilePath) ->
- case couch_file:open(FilePath, [nologifmissing]) of
- {ok, Fd} ->
- case couch_file:read_header(Fd) of
- {ok, Header} -> {ok, Fd, Header};
- no_valid_header -> {ok, Fd, nil}
- end;
- {error, enoent} ->
- {ok, Fd} = couch_file:open(FilePath, [create]),
- {ok, Fd, nil}
- end.
-
-
-reset_compaction_file(Fd, Header) ->
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, Header).
-
-
-copy_purge_info(OldDb, NewDb) ->
- OldHdr = OldDb#db.header,
- NewHdr = NewDb#db.header,
- OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
- if OldPurgeSeq > 0 ->
- {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
- Opts = [{compression, NewDb#db.compression}],
- {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
- NewNewHdr = couch_db_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewDb#db{header = NewNewHdr};
- true ->
- NewDb
+ ok;
+ false ->
+ couch_stats_process_tracker:track([couchdb, open_databases])
end.
-commit_compaction_data(#db{}=Db) ->
- % Compaction needs to write headers to both the data file
- % and the meta file so if we need to restart we can pick
- % back up from where we left off.
- commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
- commit_compaction_data(Db, Db#db.fd).
-
-
-commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
- % Mostly copied from commit_data/2 but I have to
- % replace the logic to commit and fsync to a specific
- % fd instead of the Filepath stuff that commit_data/2
- % does.
- DataState = couch_db_header:id_tree_state(OldHeader),
- MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
- MetaState = couch_emsort:get_state(Db0#db.id_tree),
- Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
- Header = db_to_header(Db1, OldHeader),
- CompHeader = #comp_header{
- db_header = Header,
- meta_state = MetaState
- },
- ok = couch_file:sync(Fd),
- ok = couch_file:write_header(Fd, CompHeader),
- Db2 = Db1#db{
- waiting_delayed_commit=nil,
- header=Header,
- committed_update_seq=Db1#db.update_seq
- },
- bind_emsort(Db2, MetaFd, MetaState).
-
-
-bind_emsort(Db, Fd, nil) ->
- {ok, Ems} = couch_emsort:open(Fd),
- Db#db{id_tree=Ems};
-bind_emsort(Db, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
- Db#db{id_tree=Ems}.
-
-
-bind_id_tree(Db, Fd, State) ->
- {ok, IdBtree} = couch_btree:open(State, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- Db#db{id_tree=IdBtree}.
-
-
-sort_meta_data(Db0) ->
- {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
- Db0#db{id_tree=Ems}.
-
-
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
- Src = Db#db.id_tree,
- DstState = couch_db_header:id_tree_state(Header),
- {ok, IdTree0} = couch_btree:open(DstState, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- {ok, Iter} = couch_emsort:iter(Src),
- Acc0 = #merge_st{
- id_tree=IdTree0,
- seq_tree=Db#db.seq_tree,
- rem_seqs=[],
- infos=[]
- },
- Acc = merge_docids(Iter, Acc0),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
- {ok, SeqTree} = couch_btree:add_remove(
- Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
- ),
- Db#db{id_tree=IdTree, seq_tree=SeqTree}.
-
-
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
- #merge_st{
- id_tree=IdTree0,
- seq_tree=SeqTree0,
- rem_seqs=RemSeqs
- } = Acc,
- {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
- {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
- Acc1 = Acc#merge_st{
- id_tree=IdTree1,
- seq_tree=SeqTree1,
- rem_seqs=[],
- infos=[]
- },
- merge_docids(Iter, Acc1);
-merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
- case next_info(Iter, Curr, []) of
- {NextIter, NewCurr, FDI, Seqs} ->
- Acc1 = Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = NewCurr
- },
- merge_docids(NextIter, Acc1);
- {finished, FDI, Seqs} ->
- Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = undefined
- };
- empty ->
- Acc
- end.
+finish_engine_swap(_OldDb, _NewEngine, _CompactFilePath) ->
+ erlang:error(explode).
-next_info(Iter, undefined, []) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, Seq}, FDI}, NextIter} ->
- next_info(NextIter, {Id, Seq, FDI}, []);
- finished ->
- empty
- end;
-next_info(Iter, {Id, Seq, FDI}, Seqs) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, NSeq}, NFDI}, NextIter} ->
- next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
- {ok, {{NId, NSeq}, NFDI}, NextIter} ->
- {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
- finished ->
- {finished, FDI, Seqs}
- end.
-
+pair_write_info(Old, New) ->
+ lists:map(fun(FDI) ->
+ case lists:keyfind(FDI#full_doc_info.id, #full_doc_info.id, Old) of
+ #full_doc_info{} = OldFDI -> {OldFDI, FDI};
+ false -> {not_found, FDI}
+ end
+ end, New).
-update_compact_task(NumChanges) ->
- [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
- Changes2 = Changes + NumChanges,
- Progress = case Total of
- 0 ->
- 0;
- _ ->
- (Changes2 * 100) div Total
- end,
- couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+pair_purge_info(Old, New) ->
+ lists:map(fun(OldFDI) ->
+ case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
+ #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
+ false -> {OldFDI, not_found}
+ end
+ end, Old).
-make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
- Body = case couch_compress:is_compressed(Body0, Comp) of
- true ->
- Body0;
- false ->
- % pre 1.2 database file format
- couch_compress:compress(Body0, Comp)
- end,
- Atts = case couch_compress:is_compressed(Atts0, Comp) of
- true ->
- Atts0;
- false ->
- couch_compress:compress(Atts0, Comp)
- end,
- SummaryBin = ?term_to_bin({Body, Atts}),
- couch_file:assemble_file_chunk(SummaryBin, couch_crypto:hash(md5, SummaryBin)).
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "everyone") of
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/couch_httpd_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_db.erl b/src/couch_httpd_db.erl
index fe42dfe..4cf6857 100644
--- a/src/couch_httpd_db.erl
+++ b/src/couch_httpd_db.erl
@@ -217,7 +217,13 @@ handle_design_info_req(#httpd{
create_db_req(#httpd{user_ctx=UserCtx}=Req, DbName) ->
ok = couch_httpd:verify_is_server_admin(Req),
- case couch_server:create(DbName, [{user_ctx, UserCtx}]) of
+ Engine = case couch_httpd:qs_value(Req, "engine") of
+ EngineStr when is_list(EngineStr) ->
+ [{engine, iolist_to_binary(EngineStr)}];
+ _ ->
+ []
+ end,
+ case couch_server:create(DbName, [{user_ctx, UserCtx}] ++ Engine) of
{ok, Db} ->
couch_db:close(Db),
DbUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/couch_httpd_misc_handlers.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_misc_handlers.erl b/src/couch_httpd_misc_handlers.erl
index eb75a94..3fc4d9a 100644
--- a/src/couch_httpd_misc_handlers.erl
+++ b/src/couch_httpd_misc_handlers.erl
@@ -17,8 +17,6 @@
handle_uuids_req/1,handle_config_req/1,
handle_task_status_req/1, handle_file_req/2]).
--export([increment_update_seq_req/2]).
-
-include_lib("couch/include/couch_db.hrl").
@@ -310,14 +308,3 @@ handle_approved_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Re
send_json(Req, 200, list_to_binary(OldValue))
end.
-
-% httpd db handlers
-
-increment_update_seq_req(#httpd{method='POST'}=Req, Db) ->
- couch_httpd:validate_ctype(Req, "application/json"),
- {ok, NewSeq} = couch_db:increment_update_seq(Db),
- send_json(Req, {[{ok, true},
- {update_seq, NewSeq}
- ]});
-increment_update_seq_req(Req, _Db) ->
- send_method_not_allowed(Req, "POST").
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/couch_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_server.erl b/src/couch_server.erl
index f365718..2c77e98 100644
--- a/src/couch_server.erl
+++ b/src/couch_server.erl
@@ -21,6 +21,8 @@
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
-export([close_lru/0]).
+-export([delete_compaction_files/1]).
+-export([exists/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -33,6 +35,7 @@
-record(server,{
root_dir = [],
+ engines = [],
max_dbs_open=?MAX_DBS_OPEN,
dbs_open=0,
start_time="",
@@ -118,6 +121,35 @@ create(DbName, Options0) ->
delete(DbName, Options) ->
gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+
+exists(DbName) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ Engines = get_configured_engines(),
+ Possible = lists:foldl(fun({Extension, Engine}, Acc) ->
+ Path = make_filepath(RootDir, DbName, Extension),
+ case couch_db_engine:exists(Engine, Path) of
+ true ->
+ [{Engine, Path} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Engines),
+ Possible /= [].
+
+
+delete_compaction_files(DbName) ->
+ delete_compaction_files(DbName, []).
+
+delete_compaction_files(DbName, DelOpts) when is_list(DbName) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ lists:foreach(fun({Ext, Engine}) ->
+ FPath = make_filepath(RootDir, DbName, Ext),
+ couch_db_engine:delete_compaction_files(Engine, RootDir, FPath, DelOpts)
+ end, get_configured_engines()),
+ ok;
+delete_compaction_files(DbName, DelOpts) when is_binary(DbName) ->
+ delete_compaction_files(?b2l(DbName), DelOpts).
+
maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) ->
maybe_add_sys_db_callbacks(?b2l(DbName), Options);
maybe_add_sys_db_callbacks(DbName, Options) ->
@@ -165,9 +197,6 @@ is_admin(User, ClearPwd) ->
has_admins() ->
config:get("admins") /= [].
-get_full_filename(Server, DbName) ->
- filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
-
hash_admin_passwords() ->
hash_admin_passwords(true).
@@ -185,6 +214,7 @@ init([]) ->
% will restart us and then we will pick up the new settings.
RootDir = config:get("couchdb", "database_dir", "."),
+ Engines = get_configured_engines(),
MaxDbsOpen = list_to_integer(
config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))),
UpdateLruOnRead =
@@ -196,6 +226,7 @@ init([]) ->
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
+ engines = Engines,
max_dbs_open=MaxDbsOpen,
update_lru_on_read=UpdateLruOnRead,
start_time=couch_util:rfc1123_date()}}.
@@ -220,6 +251,8 @@ handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) ->
{ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})};
handle_config_change("couchdb", "max_dbs_open", _, _, _) ->
{ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})};
+handle_config_change("couchdb_engines", _, _, _, _) ->
+ {ok, gen_server:call(couch_server,reload_engines)};
handle_config_change("admins", _, _, Persist, _) ->
% spawn here so couch event manager doesn't deadlock
{ok, spawn(fun() -> hash_admin_passwords(Persist) end)};
@@ -254,11 +287,15 @@ all_databases() ->
all_databases(Fun, Acc0) ->
{ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
NormRoot = couch_util:normpath(Root),
- FinalAcc = try
- filelib:fold_files(Root,
+ Extensions = get_engine_extensions(),
+ ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")",
+ RegExp =
"^[a-z0-9\\_\\$()\\+\\-]*" % stock CouchDB name regex
"(\\.[0-9]{10,})?" % optional shard timestamp
- "\\.couch$", % filename extension
+ "\\." ++ ExtRegExp ++ "$", % filename extension
+ FinalAcc = try
+ couch_util:fold_files(Root,
+ RegExp,
true,
fun(Filename, AccIn) ->
NormFilename = couch_util:normpath(Filename),
@@ -266,7 +303,8 @@ all_databases(Fun, Acc0) ->
[$/ | RelativeFilename] -> ok;
RelativeFilename -> ok
end,
- case Fun(couch_util:drop_dot_couch_ext(?l2b(RelativeFilename)), AccIn) of
+ Ext = filename:extension(RelativeFilename),
+ case Fun(?l2b(filename:rootname(RelativeFilename, Ext)), AccIn) of
{ok, NewAcc} -> NewAcc;
{stop, NewAcc} -> throw({stop, Fun, NewAcc})
end
@@ -293,11 +331,11 @@ maybe_close_lru_db(#server{lru=Lru}=Server) ->
{error, all_dbs_active}
end.
-open_async(Server, From, DbName, Filepath, Options) ->
+open_async(Server, From, DbName, {Module, Filepath}, Options) ->
Parent = self(),
T0 = os:timestamp(),
Opener = spawn_link(fun() ->
- Res = couch_db:start_link(DbName, Filepath, Options),
+ Res = couch_db:start_link(Module, DbName, Filepath, Options),
case {Res, lists:member(create, Options)} of
{{ok, _Db}, true} ->
couch_event:notify(DbName, created);
@@ -334,6 +372,8 @@ handle_call({set_update_lru_on_read, UpdateOnRead}, _From, Server) ->
{reply, ok, Server#server{update_lru_on_read=UpdateOnRead}};
handle_call({set_max_dbs_open, Max}, _From, Server) ->
{reply, ok, Server#server{max_dbs_open=Max}};
+handle_call(reload_engines, _From, Server) ->
+ {reply, ok, Server#server{engines = get_configured_engines()}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
@@ -351,7 +391,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
[gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
case ReqType of
- {create, DbName, _Filepath, _Options, CrFrom} ->
+ {create, DbName, _Engine, _Options, CrFrom} ->
gen_server:reply(CrFrom, file_exists);
_ ->
ok
@@ -386,8 +426,8 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, FromPid),
NewServer = case ReqType of
- {create, DbName, Filepath, Options, CrFrom} ->
- open_async(Server, CrFrom, DbName, Filepath, Options);
+ {create, DbName, Engine, Options, CrFrom} ->
+ open_async(Server, CrFrom, DbName, Engine, Options);
_ ->
Server
end,
@@ -401,8 +441,8 @@ handle_call({open, DbName, Options}, From, Server) ->
ok ->
case make_room(Server, Options) of
{ok, Server2} ->
- Filepath = get_full_filename(Server, DbNameList),
- {noreply, open_async(Server2, From, DbName, Filepath, Options)};
+ Engine = get_engine(Server, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Engine, Options)};
CloseError ->
{reply, CloseError, Server}
end;
@@ -421,14 +461,14 @@ handle_call({open, DbName, Options}, From, Server) ->
end;
handle_call({create, DbName, Options}, From, Server) ->
DbNameList = binary_to_list(DbName),
- Filepath = get_full_filename(Server, DbNameList),
+ Engine = get_engine(Server, DbNameList, Options),
case check_dbname(Server, DbNameList) of
ok ->
case ets:lookup(couch_dbs, DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
- {noreply, open_async(Server2, From, DbName, Filepath,
+ {noreply, open_async(Server2, From, DbName, Engine,
[create | Options])};
CloseError ->
{reply, CloseError, Server}
@@ -438,7 +478,7 @@ handle_call({create, DbName, Options}, From, Server) ->
% the middle of trying to open it. We allow one creator
% to wait while we figure out if it'll succeed.
CrOptions = [create | Options],
- Req = {create, DbName, Filepath, CrOptions, From},
+ Req = {create, DbName, Engine, CrOptions, From},
true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
@@ -451,7 +491,6 @@ handle_call({delete, DbName, Options}, _From, Server) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
- FullFilepath = get_full_filename(Server, DbNameList),
Server2 =
case ets:lookup(couch_dbs, DbName) of
[] -> Server;
@@ -468,18 +507,16 @@ handle_call({delete, DbName, Options}, _From, Server) ->
db_closed(Server, Entry#entry.db_options)
end,
- %% Delete any leftover compaction files. If we don't do this a
- %% subsequent request for this DB will try to open them to use
- %% as a recovery.
- lists:foreach(fun(Ext) ->
- couch_file:delete(Server#server.root_dir, FullFilepath ++ Ext)
- end, [".compact", ".compact.data", ".compact.meta"]),
- couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"),
-
couch_db_plugin:on_delete(DbName, Options),
DelOpt = [{context, delete} | Options],
- case couch_file:delete(Server#server.root_dir, FullFilepath, DelOpt) of
+
+ % Make sure and remove all compaction data
+ delete_compaction_files(DbNameList, DelOpt),
+
+ {Engine, FilePath} = get_engine(Server, DbNameList),
+ RootDir = Server#server.root_dir,
+ case couch_db_engine:delete(Engine, RootDir, FilePath, DelOpt) of
ok ->
couch_event:notify(DbName, deleted),
{reply, ok, Server2};
@@ -558,6 +595,106 @@ db_closed(Server, Options) ->
true -> Server
end.
+
+get_configured_engines() ->
+ ConfigEntries = config:get("couchdb_engines"),
+ Engines = lists:flatmap(fun({Extension, ModuleStr}) ->
+ try
+ [{Extension, list_to_atom(ModuleStr)}]
+ catch _T:_R ->
+ []
+ end
+ end, ConfigEntries),
+ case Engines of
+ [] ->
+ [{"couch", couch_bt_engine}];
+ Else ->
+ Else
+ end.
+
+
+get_engine(Server, DbName, Options) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ case couch_util:get_value(engine, Options) of
+ Ext when is_binary(Ext) ->
+ ExtStr = binary_to_list(Ext),
+ case couch_util:get_value(ExtStr, Engines) of
+ Engine when is_atom(Engine) ->
+ Path = make_filepath(RootDir, DbName, ExtStr),
+ {Engine, Path};
+ _ ->
+ get_engine(Server, DbName)
+ end;
+ _ ->
+ get_engine(Server, DbName)
+ end.
+
+
+get_engine(Server, DbName) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ Possible = lists:foldl(fun({Extension, Engine}, Acc) ->
+ Path = make_filepath(RootDir, DbName, Extension),
+ case couch_db_engine:exists(Engine, Path) of
+ true ->
+ [{Engine, Path} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Engines),
+ case Possible of
+ [] ->
+ get_default_engine(Server, DbName);
+ [Engine] ->
+ Engine;
+ _ ->
+ erlang:error(engine_conflict)
+ end.
+
+
+get_default_engine(Server, DbName) ->
+ #server{
+ root_dir = RootDir,
+ engines = Engines
+ } = Server,
+ Default = {couch_bt_engine, make_filepath(RootDir, DbName, "couch")},
+ case config:get("couchdb", "default_engine") of
+ Extension when is_list(Extension) ->
+ case lists:keyfind(Extension, 1, Engines) of
+ {Extension, Module} ->
+ {Module, make_filepath(RootDir, DbName, Extension)};
+ false ->
+ Default
+ end;
+ _ ->
+ Default
+ end.
+
+
+make_filepath(RootDir, DbName, Extension) when is_binary(RootDir) ->
+ make_filepath(binary_to_list(RootDir), DbName, Extension);
+make_filepath(RootDir, DbName, Extension) when is_binary(DbName) ->
+ make_filepath(RootDir, binary_to_list(DbName), Extension);
+make_filepath(RootDir, DbName, Extension) when is_binary(Extension) ->
+ make_filepath(RootDir, DbName, binary_to_list(Extension));
+make_filepath(RootDir, DbName, Extension) ->
+ filename:join([RootDir, "./" ++ DbName ++ "." ++ Extension]).
+
+
+get_engine_extensions() ->
+ case config:get("couchdb_engines") of
+ [] ->
+ ["couch"];
+ Entries ->
+ [Ext || {Ext, _Mod} <- Entries]
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/couch_stream.erl
----------------------------------------------------------------------
diff --git a/src/couch_stream.erl b/src/couch_stream.erl
index 913977f..1980246 100644
--- a/src/couch_stream.erl
+++ b/src/couch_stream.erl
@@ -14,21 +14,40 @@
-behaviour(gen_server).
-vsn(1).
-% public API
--export([open/1, open/2, close/1]).
--export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
--export([copy_to_new_stream/3, write/2]).
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_cast/2, handle_call/3, handle_info/2]).
+-export([
+ open/1,
+ open/2,
+ close/1,
+
+ copy/2,
+ write/2,
+ to_disk_term/1,
+
+ foldl/3,
+ foldl/4,
+ foldl_decode/5,
+ range_foldl/5
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
-include_lib("couch/include/couch_db.hrl").
+
-define(DEFAULT_BUFFER_SIZE, 4096).
--record(stream,
- {fd = 0,
+
+-record(stream, {
+ engine,
opener_monitor,
written_pointers=[],
buffer_list = [],
@@ -42,114 +61,94 @@
identity_len = 0,
encoding_fun,
end_encoding_fun
- }).
+}).
-%%% Interface functions %%%
+open({_StreamEngine, _StreamEngineState} = Engine) ->
+ open(Engine, []).
-open(Fd) ->
- open(Fd, []).
-open(Fd, Options) ->
- gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []).
+open({_StreamEngine, _StreamEngineState} = Engine, Options) ->
+ gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []).
+
close(Pid) ->
gen_server:call(Pid, close, infinity).
-copy_to_new_stream(Fd, PosList, DestFd) ->
- {ok, Dest} = open(DestFd),
- foldl(Fd, PosList,
- fun(Bin, _) ->
- ok = write(Dest, Bin)
- end, ok),
- close(Dest).
-
-foldl(_Fd, [], _Fun, Acc) ->
- Acc;
-foldl(Fd, [Pos|Rest], Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-
-foldl(Fd, PosList, <<>>, Fun, Acc) ->
- foldl(Fd, PosList, Fun, Acc);
-foldl(Fd, PosList, Md5, Fun, Acc) ->
- foldl(Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc).
-
-foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+
+copy(Src, Dst) ->
+ foldl(Src, fun(Bin, _) ->
+ ok = write(Dst, Bin)
+ end, ok).
+
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
+ gen_server:call(Pid, {write, Bin}, infinity).
+
+
+to_disk_term({Engine, EngineState}) ->
+ Engine:to_disk_term(EngineState).
+
+
+foldl({Engine, EngineState}, Fun, Acc) ->
+ Engine:foldl(EngineState, Fun, Acc).
+
+
+foldl(Engine, <<>>, Fun, Acc) ->
+ foldl(Engine, Fun, Acc);
+foldl(Engine, Md5, UserFun, UserAcc) ->
+ InitAcc = {couch_crypto:hash_init(md5), UserFun, UserAcc},
+ {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc),
+ Md5 = couch_crypto:hash_final(md5, Md5Acc),
+ OutAcc.
+
+
+foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) ->
{DecDataFun, DecEndFun} = case Enc of
- gzip ->
- ungzip_init();
- identity ->
- identity_enc_dec_funs()
+ gzip -> ungzip_init();
+ identity -> identity_enc_dec_funs()
end,
- Result = foldl_decode(
- DecDataFun, Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc
- ),
+ InitAcc = {DecDataFun, UserFun, UserAcc1},
+ {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc),
DecEndFun(),
- Result.
+ UserAcc2.
+
+
+range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From ->
+ NewEngine = do_seek(Engine, From),
+ InitAcc = {To - From, UserFun, UserAcc},
+ try
+ {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc),
+ UserAcc2
+ catch
+ throw:{finished, UserAcc3} ->
+ UserAcc3
+ end.
-foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = couch_crypto:hash_final(md5, Md5Acc),
- Acc;
-foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
- foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, Bin)),
- Fun(Bin, Acc);
-foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Md5, couch_crypto:hash_update(md5, Md5Acc, Bin), Fun, Fun(Bin, Acc)).
-
-range_foldl(Fd, PosList, From, To, Fun, Acc) ->
- range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
-
-range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
- Acc;
-range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
-range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
-range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Bin1 = if
- From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
- true ->
- PrefixLen = clip(From - Off, 0, Size),
- PostfixLen = clip(Off + Size - To, 0, Size),
- MatchLen = Size - PrefixLen - PostfixLen,
- <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
- Match
- end,
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
-clip(Value, Lo, Hi) ->
- if
- Value < Lo -> Lo;
- Value > Hi -> Hi;
- true -> Value
+foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) ->
+ NewMd5Acc = couch_crypto:hash_update(md5, Md5Acc, Bin),
+ {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}.
+
+
+foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) ->
+ case DecFun(EncBin) of
+ <<>> -> {DecFun, UserFun, UserAcc};
+ Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)}
end.
-foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = couch_crypto:hash_final(md5, Md5Acc),
- Acc;
-foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, EncBin)),
- Bin = DecFun(EncBin),
- Fun(Bin, Acc);
-foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Bin = DecFun(EncBin),
- Md5Acc2 = couch_crypto:hash_update(md5, Md5Acc, EncBin),
- foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+foldl_length(Bin, {Length, UserFun, UserAcc}) ->
+ BinSize = size(Bin),
+ case BinSize =< Length of
+ true ->
+ {Length - BinSize, UserFun, UserFun(Bin, UserAcc)};
+ false ->
+ <<Trunc:BinSize/binary, _/binary>> = Bin,
+ throw({finished, UserFun(Trunc, UserAcc)})
+ end.
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
@@ -192,23 +191,16 @@ identity_enc_dec_funs() ->
fun() -> [] end
}.
-write(_Pid, <<>>) ->
- ok;
-write(Pid, Bin) ->
- gen_server:call(Pid, {write, Bin}, infinity).
-
-init({Fd, OpenerPid, OpenerPriority, Options}) ->
+init({Engine, OpenerPid, OpenerPriority, Options}) ->
erlang:put(io_priority, OpenerPriority),
{EncodingFun, EndEncodingFun} =
case couch_util:get_value(encoding, Options, identity) of
- identity ->
- identity_enc_dec_funs();
- gzip ->
- gzip_init(Options)
+ identity -> identity_enc_dec_funs();
+ gzip -> gzip_init(Options)
end,
{ok, #stream{
- fd=Fd,
+ engine=Engine,
opener_monitor=erlang:monitor(process, OpenerPid),
md5=couch_crypto:hash_init(md5),
identity_md5=couch_crypto:hash_init(md5),
@@ -225,9 +217,8 @@ terminate(_Reason, _Stream) ->
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
- fd = Fd,
+ engine = Engine,
written_len = WrittenLen,
- written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
@@ -242,19 +233,18 @@ handle_call({write, Bin}, _From, Stream) ->
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
+ NewEngine = Engine,
WrittenLen2 = WrittenLen,
- Md5_2 = Md5,
- Written2 = Written;
+ Md5_2 = Md5;
WriteBin2 ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
+ NewEngine = do_write(Engine, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
- Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2),
- Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
+ Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2)
end,
{reply, ok, Stream#stream{
+ engine = NewEngine,
written_len=WrittenLen2,
- written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
@@ -268,10 +258,9 @@ handle_call({write, Bin}, _From, Stream) ->
end;
handle_call(close, _From, Stream) ->
#stream{
- fd = Fd,
+ engine = Engine,
opener_monitor = MonRef,
written_len = WrittenLen,
- written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
@@ -285,12 +274,11 @@ handle_call(close, _From, Stream) ->
Md5Final = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
- {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
+ {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
- StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
+ NewEngine = do_write(Engine, WriteBin2),
StreamLen = WrittenLen + iolist_size(WriteBin2),
- {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
+ {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
erlang:demonitor(MonRef),
{stop, normal, Result, Stream}.
@@ -305,3 +293,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
+
+
+do_seek({Engine, EngineState}, Offset) ->
+ {ok, NewState} = Engine:seek(EngineState, Offset),
+ {Engine, NewState}.
+
+do_write({Engine, EngineState}, Data) ->
+ {ok, NewState} = Engine:write(EngineState, Data),
+ {Engine, NewState}.
+
+do_finalize({Engine, EngineState}) ->
+ {ok, NewState} = Engine:finalize(EngineState),
+ {Engine, NewState}.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/couch_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_util.erl b/src/couch_util.erl
index d688c12..dc2ef64 100644
--- a/src/couch_util.erl
+++ b/src/couch_util.erl
@@ -12,7 +12,7 @@
-module(couch_util).
--export([priv_dir/0, normpath/1]).
+-export([priv_dir/0, normpath/1, fold_files/5]).
-export([should_flush/0, should_flush/1, to_existing_atom/1]).
-export([rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, drop_dot_couch_ext/1]).
@@ -33,6 +33,7 @@
-export([find_in_binary/2]).
-export([callback_exists/3, validate_callback_exists/3]).
-export([with_proc/4]).
+-export([check_md5/2]).
-include_lib("couch/include/couch_db.hrl").
@@ -62,6 +63,37 @@ normparts(["." | RestParts], Acc) ->
normparts([Part | RestParts], Acc) ->
normparts(RestParts, [Part | Acc]).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc) ->
+ {ok, Re} = re:compile(RegExp, [unicode]),
+ fold_files1(Dir, Re, Recursive, Fun, Acc).
+
+fold_files1(Dir, RegExp, Recursive, Fun, Acc) ->
+ case file:list_dir(Dir) of
+ {ok, Files} ->
+ fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc);
+ {error, _} ->
+ Acc
+ end.
+
+fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc) ->
+ Acc;
+fold_files2([File | Rest], Dir, RegExp, Recursive, Fun, Acc0) ->
+ FullName = filename:join(Dir, File),
+ case (catch re:run(File, RegExp, [{capture, none}])) of
+ match ->
+ Acc1 = Fun(FullName, Acc0),
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1);
+ _ ->
+ case Recursive andalso filelib:is_dir(FullName) of
+ true ->
+ Acc1 = fold_files1(FullName, RegExp, Recursive, Fun, Acc0),
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1);
+ false ->
+ fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc0)
+ end
+ end.
+
% works like list_to_existing_atom, except can be list or binary and it
% gives you the original value instead of an error if no existing atom.
to_existing_atom(V) when is_list(V) ->
@@ -578,6 +610,12 @@ validate_callback_exists(Module, Function, Arity) ->
{undefined_callback, CallbackStr, {Module, Function, Arity}}})
end.
+
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig, Sig) -> ok;
+check_md5(_, _) -> throw(md5_mismatch).
+
+
ensure_loaded(Module) when is_atom(Module) ->
case code:ensure_loaded(Module) of
{module, Module} ->
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/src/test_util.erl
----------------------------------------------------------------------
diff --git a/src/test_util.erl b/src/test_util.erl
index b5bb232..6bf7b46 100644
--- a/src/test_util.erl
+++ b/src/test_util.erl
@@ -34,6 +34,8 @@
-export([start/1, start/2, start/3, stop/1]).
+-export([fake_db/1]).
+
-record(test_context, {mocked = [], started = [], module}).
-define(DEFAULT_APPS,
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/01eaca3a/test/couch_db_plugin_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_db_plugin_tests.erl b/test/couch_db_plugin_tests.erl
index 94dd3df..52533fe 100644
--- a/test/couch_db_plugin_tests.erl
+++ b/test/couch_db_plugin_tests.erl
@@ -43,7 +43,7 @@ data_providers() -> [].
data_subscriptions() -> [].
processes() -> [].
notify(_, _, _) -> ok.
-fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)).
+fake_db() -> test_util:fake_db([]).
setup() ->
couch_tests:setup([