You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/05 00:44:24 UTC
[11/44] Remove src/couch
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_db_update_notifier_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_db_update_notifier_sup.erl b/src/couch/src/couch_db_update_notifier_sup.erl
deleted file mode 100644
index 9eb943a..0000000
--- a/src/couch/src/couch_db_update_notifier_sup.erl
+++ /dev/null
@@ -1,68 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%
-% This causes an OS process to spawned and it is notified every time a database
-% is updated.
-%
-% The notifications are in the form of a the database name sent as a line of
-% text to the OS processes stdout.
-%
-
--module(couch_db_update_notifier_sup).
-
--behaviour(supervisor).
--behaviour(config_listener).
-
--export([start_link/0, init/1]).
-
-% config_listener api
--export([handle_config_change/5]).
-
-
-start_link() ->
- supervisor:start_link({local, couch_db_update_notifier_sup},
- couch_db_update_notifier_sup, []).
-
-init([]) ->
- ok = config:listen_for_changes(?MODULE, nil),
-
- UpdateNotifierExes = config:get("update_notification"),
-
- {ok,
- {{one_for_one, 10, 3600},
- lists:map(fun({Name, UpdateNotifierExe}) ->
- {Name,
- {couch_db_update_notifier, start_link, [UpdateNotifierExe]},
- permanent,
- 1000,
- supervisor,
- [couch_db_update_notifier]}
- end, UpdateNotifierExes)}}.
-
-%% @doc when update_notification configuration changes, terminate the process
-%% for that notifier and start a new one with the updated config
-handle_config_change("update_notification", Id, Exe, _, _) ->
- ChildSpec = {
- Id,
- {couch_db_update_notifier, start_link, [Exe]},
- permanent,
- 1000,
- supervisor,
- [couch_db_update_notifier]
- },
- supervisor:terminate_child(couch_db_update_notifier_sup, Id),
- supervisor:delete_child(couch_db_update_notifier_sup, Id),
- supervisor:start_child(couch_db_update_notifier_sup, ChildSpec),
- {ok, nil};
-handle_config_change(_, _, _, _, _) ->
- {ok, nil}.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
deleted file mode 100644
index 649826a..0000000
--- a/src/couch/src/couch_db_updater.erl
+++ /dev/null
@@ -1,1264 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_db_updater).
--behaviour(gen_server).
-
--export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]).
--export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]).
--export([make_doc_summary/2]).
--export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
--record(comp_header, {
- db_header,
- meta_state
-}).
-
--record(merge_st, {
- id_tree,
- seq_tree,
- curr,
- rem_seqs,
- infos
-}).
-
-init({DbName, Filepath, Fd, Options}) ->
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath ++ ".compact"),
- couch_file:delete(RootDir, Filepath ++ ".compact.data"),
- couch_file:delete(RootDir, Filepath ++ ".compact.meta");
- false ->
- case couch_file:read_header(Fd) of
- {ok, Header} ->
- ok;
- no_valid_header ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact"),
- file:delete(Filepath ++ ".compact.data"),
- file:delete(Filepath ++ ".compact.meta")
- end
- end,
- Db = init_db(DbName, Filepath, Fd, Header, Options),
- % we don't load validation funs here because the fabric query is liable to
- % race conditions. Instead see couch_db:validate_doc_update, which loads
- % them lazily
- {ok, Db#db{main_pid = self()}}.
-
-
-terminate(_Reason, Db) ->
- % If the reason we died is becuase our fd disappeared
- % then we don't need to try closing it again.
- case Db#db.fd of
- Pid when is_pid(Pid) ->
- ok = couch_file:close(Db#db.fd);
- _ ->
- ok
- end,
- couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd),
- ok.
-
-handle_call(get_db, _From, Db) ->
- {reply, {ok, Db}, Db};
-handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
- {reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
- {reply, ok, commit_data(Db)};
-handle_call({full_commit, RequiredSeq}, _From, Db)
- when RequiredSeq =< Db#db.committed_update_seq ->
- {reply, ok, Db};
-handle_call({full_commit, _}, _, Db) ->
- {reply, ok, commit_data(Db)}; % commit the data and return ok
-handle_call(start_compact, _From, Db) ->
- {noreply, NewDb} = handle_cast(start_compact, Db),
- {reply, {ok, NewDb#db.compactor_pid}, NewDb};
-handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) ->
- {reply, Pid, Db};
-handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
- {reply, ok, Db};
-handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
- unlink(Pid),
- exit(Pid, kill),
- RootDir = config:get("couchdb", "database_dir", "."),
- ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
- Db2 = Db#db{compactor_pid = nil},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
-handle_call(increment_update_seq, _From, Db) ->
- Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, Db2#db.update_seq}, Db2};
-
-handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
- {ok, Ptr, _} = couch_file:append_term(
- Db#db.fd, NewSec, [{compression, Comp}]),
- Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
-
-handle_call({set_revs_limit, Limit}, _From, Db) ->
- Db2 = commit_data(Db#db{revs_limit=Limit,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
-
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- #db{
- fd = Fd,
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- header = Header = #db_header{purge_seq=PurgeSeq},
- compression = Comp
- } = Db,
- DocLookups = couch_btree:lookup(DocInfoByIdBTree,
- [Id || {Id, _Revs} <- IdRevs]),
-
- NewDocInfos = lists:zipwith(
- fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, []=_RemovedRevs} -> % no change
- nil;
- {NewTree, RemovedRevs} ->
- {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
- end;
- (_, not_found) ->
- nil
- end,
- IdRevs, DocLookups),
-
- SeqsToRemove = [Seq
- || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
-
- FullDocInfoToUpdate = [FullInfo
- || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
- <- NewDocInfos, Tree /= []],
-
- IdRevsPurged = [{Id, Revs}
- || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
-
- {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
- fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
- Tree2 = couch_key_tree:map_leafs(
- fun(_RevId, Leaf) ->
- Leaf#leaf{seq=SeqAcc+1}
- end, Tree),
- {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1}
- end, LastSeq, FullDocInfoToUpdate),
-
- IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
- <- NewDocInfos],
-
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
- DocInfoToUpdate, SeqsToRemove),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
- FullDocInfoToUpdate, IdsToRemove),
- {ok, Pointer, _} = couch_file:append_term(
- Fd, IdRevsPurged, [{compression, Comp}]),
-
- Db2 = commit_data(
- Db#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq + 1,
- header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
-
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}.
-
-
-handle_cast({load_validation_funs, ValidationFuns}, Db) ->
- Db2 = Db#db{validate_doc_funs = ValidationFuns},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
-handle_cast(start_compact, Db) ->
- case Db#db.compactor_pid of
- nil ->
- ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
- _ ->
- % compact currently running, this is a no-op
- {noreply, Db}
- end;
-handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd),
- #db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
- unlink(NewFd),
- case Db#db.update_seq == NewSeq of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
-
- NewDb2 = commit_data(NewDb#db{
- local_tree = NewLocalBtree,
- main_pid = self(),
- filepath = Filepath,
- instance_start_time = Db#db.instance_start_time,
- revs_limit = Db#db.revs_limit
- }),
-
- ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
- [Filepath, CompactFilepath]),
- ok = file:rename(CompactFilepath, Filepath ++ ".compact"),
- RootDir = config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- % Delete the old meta compaction file after promoting
- % the compaction file.
- couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
- close_db(Db),
- NewDb3 = refresh_validate_doc_funs(NewDb2),
- ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
- couch_db_update_notifier:notify({compacted, NewDb3#db.name}),
- ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb3#db{compactor_pid=nil}};
- false ->
- ?LOG_INFO("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- close_db(NewDb),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
- end;
-
-handle_cast(Msg, #db{name = Name} = Db) ->
- ?LOG_ERROR("Database `~s` updater received unexpected cast: ~p", [Name, Msg]),
- {stop, Msg, Db}.
-
-
-handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
- FullCommit}, Db) ->
- GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
- if NonRepDocs == [] ->
- {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
- [Client], MergeConflicts, FullCommit);
- true ->
- GroupedDocs3 = GroupedDocs2,
- FullCommit2 = FullCommit,
- Clients = [Client]
- end,
- NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
- try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
- FullCommit2) of
- {ok, Db2, UpdatedDDocIds} ->
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_db_update_notifier:notify({updated, Db2#db.name});
- true -> ok
- end,
- [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
- lists:foreach(fun(DDocId) ->
- couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}})
- end, UpdatedDDocIds),
- {noreply, Db2, hibernate}
- catch
- throw: retry ->
- [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
- {noreply, Db, hibernate}
- end;
-handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
- %no outstanding delayed commits, ignore
- {noreply, Db};
-handle_info(delayed_commit, Db) ->
- case commit_data(Db) of
- Db ->
- {noreply, Db};
- Db2 ->
- ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
- end;
-handle_info({'EXIT', _Pid, normal}, Db) ->
- {noreply, Db};
-handle_info({'EXIT', _Pid, Reason}, Db) ->
- {stop, Reason, Db};
-handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
- ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]),
- {stop, normal, Db#db{fd=undefined, fd_monitor=undefined}}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-merge_updates([[{_,{#doc{id=X},_}}|_]=A|RestA], [[{_,{#doc{id=X},_}}|_]=B|RestB]) ->
- [A++B | merge_updates(RestA, RestB)];
-merge_updates([[{_,{#doc{id=X},_}}|_]|_]=A, [[{_,{#doc{id=Y},_}}|_]|_]=B) when X < Y ->
- [hd(A) | merge_updates(tl(A), B)];
-merge_updates([[{_,{#doc{id=X},_}}|_]|_]=A, [[{_,{#doc{id=Y},_}}|_]|_]=B) when X > Y ->
- [hd(B) | merge_updates(A, tl(B))];
-merge_updates([], RestB) ->
- RestB;
-merge_updates(RestA, []) ->
- RestA.
-
-collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
- receive
- % Only collect updates with the same MergeConflicts flag and without
- % local docs. It's easier to just avoid multiple _local doc
- % updaters than deal with their possible conflicts, and local docs
- % writes are relatively rare. Can be optmized later if really needed.
- {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
- GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
- || DocGroup <- GroupedDocs],
- GroupedDocsAcc2 =
- merge_updates(GroupedDocsAcc, GroupedDocs2),
- collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
- MergeConflicts, (FullCommit or FullCommit2))
- after 0 ->
- {GroupedDocsAcc, ClientsAcc, FullCommit}
- end.
-
-rev_tree(DiskTree) ->
- couch_key_tree:mapfold(fun
- (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, leaf, _Acc) ->
- % pre 1.2 format, will be upgraded on compaction
- {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq}, nil};
- (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, branch, Acc) ->
- {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq}, Acc};
- (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, leaf, Acc) ->
- Acc2 = sum_leaf_sizes(Acc, Size),
- {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq, size=Size}, Acc2};
- (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, branch, Acc) ->
- {#leaf{deleted=?i2b(IsDeleted), ptr=BodyPointer, seq=UpdateSeq, size=Size}, Acc};
- (_RevId, ?REV_MISSING, _Type, Acc) ->
- {?REV_MISSING, Acc}
- end, 0, DiskTree).
-
-disk_tree(RevTree) ->
- couch_key_tree:map(fun
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING;
- (_RevId, #leaf{deleted=IsDeleted, ptr=BodyPointer, seq=UpdateSeq, size=Size}) ->
- {?b2i(IsDeleted), BodyPointer, UpdateSeq, Size}
- end, RevTree).
-
-btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) ->
- {Seq, {Id, ?b2i(Del), disk_tree(T)}}.
-
-btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
- {RevTree, LeafsSize} = rev_tree(DiskTree),
- #full_doc_info{
- id = Id,
- update_seq = Seq,
- deleted = ?i2b(Del),
- rev_tree = RevTree,
- leafs_size = LeafsSize
- };
-btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
- % Older versions stored #doc_info records in the seq_tree.
- % Compact to upgrade.
- #doc_info{
- id = Id,
- high_seq=KeySeq,
- revs =
- [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
- {Rev, Seq, Bp} <- RevInfos] ++
- [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
- {Rev, Seq, Bp} <- DeletedRevInfos]}.
-
-btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
- deleted=Deleted, rev_tree=Tree}) ->
- {Id, {Seq, ?b2i(Deleted), disk_tree(Tree)}}.
-
-btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
- {Tree, LeafsSize} = rev_tree(DiskTree),
- #full_doc_info{
- id = Id,
- update_seq = HighSeq,
- deleted = ?i2b(Deleted),
- rev_tree = Tree,
- leafs_size = LeafsSize
- }.
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- lists:foldl(
- fun(Info, {NotDeleted, Deleted, Size}) ->
- Size2 = sum_leaf_sizes(Size, Info#full_doc_info.leafs_size),
- case Info#full_doc_info.deleted of
- true ->
- {NotDeleted, Deleted + 1, Size2};
- false ->
- {NotDeleted + 1, Deleted, Size2}
- end
- end,
- {0, 0, 0}, FullDocInfos);
-btree_by_id_reduce(rereduce, Reds) ->
- lists:foldl(
- fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSize}) ->
- % pre 1.2 format, will be upgraded on compaction
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
- ({NotDeleted, Deleted, Size}, {AccNotDeleted, AccDeleted, AccSize}) ->
- AccSize2 = sum_leaf_sizes(AccSize, Size),
- {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSize2}
- end,
- {0, 0, 0}, Reds).
-
-sum_leaf_sizes(nil, _) ->
- nil;
-sum_leaf_sizes(_, nil) ->
- nil;
-sum_leaf_sizes(Size1, Size2) ->
- Size1 + Size2.
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
- OldSz = tuple_size(Old),
- NewValuesTail =
- lists:sublist(tuple_to_list(New), OldSz + 1, tuple_size(New) - OldSz),
- list_to_tuple(tuple_to_list(Old) ++ NewValuesTail);
-simple_upgrade_record(Old, _New) ->
- Old.
-
--define(OLD_DISK_VERSION_ERROR,
- "Database files from versions smaller than 0.10.0 are no longer supported").
-
-init_db(DbName, Filepath, Fd, Header0, Options) ->
- Header1 = simple_upgrade_record(Header0, #db_header{}),
- Header =
- case element(2, Header1) of
- 1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
- 2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
- 3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
- 4 -> Header1#db_header{security_ptr = nil}; % 0.10 and pre 0.11
- 5 -> Header1; % pre 1.2
- ?LATEST_DISK_VERSION -> Header1;
- _ -> throw({database_disk_version_error, "Incorrect disk header version"})
- end,
-
- {ok, FsyncOptions} = couch_util:parse_term(
- config:get("couchdb", "fsync_options",
- "[before_header, after_header, on_file_open]")),
-
- case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
- _ -> ok
- end,
-
- Compression = couch_compress:get_compression_method(),
-
- {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd,
- [{split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2},
- {compression, Compression}]),
- {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd,
- [{split, fun ?MODULE:btree_by_seq_split/1},
- {join, fun ?MODULE:btree_by_seq_join/2},
- {reduce, fun ?MODULE:btree_by_seq_reduce/2},
- {compression, Compression}]),
- {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd,
- [{compression, Compression}]),
- case Header#db_header.security_ptr of
- nil ->
- Security = [],
- SecurityPtr = nil;
- SecurityPtr ->
- {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
- end,
- % convert start time tuple to microsecs and store as a binary string
- {MegaSecs, Secs, MicroSecs} = now(),
- StartTime = ?l2b(io_lib:format("~p",
- [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- ok = couch_file:set_db_pid(Fd, self()),
- #db{
- fd=Fd,
- fd_monitor = erlang:monitor(process, Fd),
- header=Header,
- id_tree = IdBtree,
- seq_tree = SeqBtree,
- local_tree = LocalDocsBtree,
- committed_update_seq = Header#db_header.update_seq,
- update_seq = Header#db_header.update_seq,
- name = DbName,
- filepath = Filepath,
- security = Security,
- security_ptr = SecurityPtr,
- instance_start_time = StartTime,
- revs_limit = Header#db_header.revs_limit,
- fsync_options = FsyncOptions,
- options = Options,
- compression = Compression,
- before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
- after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
- }.
-
-
-close_db(#db{fd_monitor = Ref}) ->
- erlang:demonitor(Ref).
-
-
-refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
- spawn(fabric, reset_validation_funs, [mem3:dbname(Name)]),
- Db#db{validate_doc_funs = undefined};
-refresh_validate_doc_funs(Db0) ->
- Db = Db0#db{user_ctx = #user_ctx{roles=[<<"_admin">>]}},
- {ok, DesignDocs} = couch_db:get_design_docs(Db),
- ProcessDocFuns = lists:flatmap(
- fun(DesignDocInfo) ->
- {ok, DesignDoc} = couch_db:open_doc_int(
- Db, DesignDocInfo, [ejson_body]),
- case couch_doc:get_validate_doc_fun(DesignDoc) of
- nil -> [];
- Fun -> [Fun]
- end
- end, DesignDocs),
- Db#db{validate_doc_funs=ProcessDocFuns}.
-
-% rev tree functions
-
-flush_trees(_Db, [], AccFlushedTrees) ->
- {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd = Fd} = Db,
- [InfoUnflushed | RestUnflushed], AccFlushed) ->
- #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
- {Flushed, LeafsSize} = couch_key_tree:mapfold(
- fun(_Rev, Value, Type, Acc) ->
- case Value of
- #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- case {AttsFd, Fd} of
- {nil, _} ->
- ok;
- {SameFd, SameFd} ->
- ok;
- _ ->
- % Fd where the attachments were written to is not the same
- % as our Fd. This can happen when a database is being
- % switched out during a compaction.
- ?LOG_DEBUG("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- {ok, NewSummaryPointer, SummarySize} =
- couch_file:append_raw_chunk(Fd, Summary),
- TotalSize = lists:foldl(
- fun(#att{att_len = L}, A) -> A + L end,
- SummarySize, Value#doc.atts),
- NewValue = #leaf{deleted=IsDeleted, ptr=NewSummaryPointer,
- seq=UpdateSeq, size=TotalSize},
- case Type of
- leaf ->
- {NewValue, Acc + TotalSize};
- branch ->
- {NewValue, Acc}
- end;
- {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
- {Value, Acc + LeafSize};
- _ ->
- {Value, Acc}
- end
- end, 0, Unflushed),
- InfoFlushed = InfoUnflushed#full_doc_info{
- rev_tree = Flushed,
- leafs_size = LeafsSize
- },
- flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
-
-
-send_result(Client, Ref, NewResult) ->
- % used to send a result to the client
- catch(Client ! {result, self(), {Ref, NewResult}}).
-
-merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
-merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
- #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
- = OldDocInfo,
- {NewRevTree, _} = lists:foldl(
- fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
- if not MergeConflicts ->
- case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
- Limit) of
- {_NewTree, conflicts} when (not OldDeleted) ->
- send_result(Client, Ref, conflict),
- {AccTree, OldDeleted};
- {NewTree, conflicts} when PrevRevs /= [] ->
- % Check to be sure if prev revision was specified, it's
- % a leaf node in the tree
- Leafs = couch_key_tree:get_all_leafs(AccTree),
- IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
- {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
- end, Leafs),
- if IsPrevLeaf ->
- {NewTree, OldDeleted};
- true ->
- send_result(Client, Ref, conflict),
- {AccTree, OldDeleted}
- end;
- {NewTree, no_conflicts} when AccTree == NewTree ->
- % the tree didn't change at all
- % meaning we are saving a rev that's already
- % been editted again.
- if (Pos == 1) and OldDeleted ->
- % this means we are recreating a brand new document
- % into a state that already existed before.
- % put the rev into a subsequent edit of the deletion
- #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
- couch_doc:to_doc_info(OldDocInfo),
- NewRevId = couch_db:new_revid(
- NewDoc#doc{revs={OldPos, [OldRev]}}),
- NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
- {NewTree2, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc2), Limit),
- % we changed the rev id, this tells the caller we did
- send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
- {NewTree2, OldDeleted};
- true ->
- send_result(Client, Ref, conflict),
- {AccTree, OldDeleted}
- end;
- {NewTree, _} ->
- {NewTree, NewDoc#doc.deleted}
- end;
- true ->
- {NewTree, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc), Limit),
- {NewTree, OldDeleted}
- end
- end,
- {OldTree, OldDeleted0}, NewDocs),
- if NewRevTree == OldTree ->
- % nothing changed
- merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
- AccNewInfos, AccRemoveSeqs, AccSeq);
- true ->
- % we have updated the document, give it a new seq #
- NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
- RemoveSeqs = case OldSeq of
- 0 -> AccRemoveSeqs;
- _ -> [OldSeq | AccRemoveSeqs]
- end,
- merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
- end.
-
-
-
-new_index_entries([], AccById, AccDDocIds) ->
- {AccById, AccDDocIds};
-new_index_entries([#full_doc_info{id=Id}=Info | Rest], AccById, AccDDocIds) ->
- #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info),
- AccById2 = [Info#full_doc_info{deleted=Del} | AccById],
- AccDDocIds2 = case Id of
- <<?DESIGN_DOC_PREFIX, _/binary>> -> [Id | AccDDocIds];
- _ -> AccDDocIds
- end,
- new_index_entries(Rest, AccById2, AccDDocIds2).
-
-
-stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
- [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
- #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-
-update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
- #db{
- id_tree = DocInfoByIdBTree,
- seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- revs_limit = RevsLimit
- } = Db,
- Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
- % lookup up the old documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
- (Id, not_found) ->
- #full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
- % Merge the new docs into the revision trees.
- {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
- MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
-
- % All documents are now ready to write.
-
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
-
- % Write out the document summaries (the bodies are stored in the nodes of
- % the trees, the attachments are already written to disk)
- {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
-
- {IndexFullDocInfos, UpdatedDDocIds} =
- new_index_entries(FlushedFullDocInfos, [], []),
-
- % and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs),
-
- Db3 = Db2#db{
- id_tree = DocInfoByIdBTree2,
- seq_tree = DocInfoBySeqBTree2,
- update_seq = NewSeq},
-
- % Check if we just updated any design documents, and update the validation
- % funs if we did.
- Db4 = case length(UpdatedDDocIds) > 0 of
- true ->
- ddoc_cache:evict(Db3#db.name, UpdatedDDocIds),
- refresh_validate_doc_funs(Db3);
- false ->
- Db3
- end,
-
- {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
-
-update_local_docs(Db, []) ->
- {ok, Db};
-update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
- Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs],
- OldDocLookups = couch_btree:lookup(Btree, Ids),
- BtreeEntries = lists:zipwith(
- fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, _OldDocLookup) ->
- case PrevRevs of
- [RevStr|_] ->
- PrevRev = list_to_integer(?b2l(RevStr));
- [] ->
- PrevRev = 0
- end,
- %% disabled conflict checking for local docs -- APK 16 June 2010
- % OldRev =
- % case OldDocLookup of
- % {ok, {_, {OldRev0, _}}} -> OldRev0;
- % not_found -> 0
- % end,
- % case OldRev == PrevRev of
- % true ->
- case Delete of
- false ->
- send_result(Client, Ref, {ok,
- {0, ?l2b(integer_to_list(PrevRev + 1))}}),
- {update, {Id, {PrevRev + 1, Body}}};
- true ->
- send_result(Client, Ref,
- {ok, {0, <<"0">>}}),
- {remove, Id}
- end%;
- % false ->
- % send_result(Client, Ref, conflict),
- % ignore
- % end
- end, Docs, OldDocLookups),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_tree = Btree2}}.
-
-db_to_header(Db, Header) ->
- Header#db_header{
- update_seq = Db#db.update_seq,
- seq_tree_state = couch_btree:get_state(Db#db.seq_tree),
- id_tree_state = couch_btree:get_state(Db#db.id_tree),
- local_tree_state = couch_btree:get_state(Db#db.local_tree),
- security_ptr = Db#db.security_ptr,
- revs_limit = Db#db.revs_limit}.
-
-commit_data(Db) ->
- commit_data(Db, false).
-
-commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
- TRef = erlang:send_after(1000,self(),delayed_commit),
- Db#db{waiting_delayed_commit=TRef};
-commit_data(Db, true) ->
- Db;
-commit_data(Db, _) ->
- #db{
- header = OldHeader,
- waiting_delayed_commit = Timer
- } = Db,
- if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
- case db_to_header(Db, OldHeader) of
- OldHeader -> Db#db{waiting_delayed_commit=nil};
- NewHeader -> sync_header(Db, NewHeader)
- end.
-
-sync_header(Db, NewHeader) ->
- #db{
- fd = Fd,
- filepath = FilePath,
- fsync_options = FsyncOptions,
- waiting_delayed_commit = Timer
- } = Db,
-
- if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
-
- Before = lists:member(before_header, FsyncOptions),
- After = lists:member(after_header, FsyncOptions),
-
- if Before -> couch_file:sync(FilePath); true -> ok end,
- ok = couch_file:write_header(Fd, NewHeader),
- if After -> couch_file:sync(FilePath); true -> ok end,
-
- Db#db{
- header=NewHeader,
- committed_update_seq=Db#db.update_seq,
- waiting_delayed_commit=nil
- }.
-
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
- BinInfos = case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, Type, BinSp, AttLen, RevPos, Md5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) ->
- {NewBinSp, AttLen, _, Md5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
-
-merge_lookups(Infos, []) ->
- Infos;
-merge_lookups([], _) ->
- [];
-merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
- % Assert we've matched our lookups
- if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
- erlang:error({mismatched_doc_infos, DI#doc_info.id})
- end,
- [FDI | merge_lookups(RestInfos, RestLookups)];
-merge_lookups([FDI | RestInfos], Lookups) ->
- [FDI | merge_lookups(RestInfos, Lookups)].
-
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
- A =< B
- end, merge_lookups(MixedInfos, LookupResults)),
-
- NewInfos1 = lists:map(
- fun(#full_doc_info{rev_tree=RevTree}=Info) ->
- Info#full_doc_info{rev_tree=couch_key_tree:map(
- fun(_, _, branch) ->
- ?REV_MISSING;
- (_Rev, #leaf{ptr=Sp}=Leaf, leaf) ->
- {_Body, AttsInfo} = Summary = copy_doc_attachments(
- Db, Sp, DestFd),
- SummaryChunk = make_doc_summary(NewDb, Summary),
- {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
- DestFd, SummaryChunk),
- TotalLeafSize = lists:foldl(
- fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end,
- SummarySize, AttsInfo),
- Leaf#leaf{ptr=Pos, size=TotalLeafSize}
- end, RevTree)}
- end, NewInfos0),
-
- NewInfos = stem_full_doc_infos(Db, NewInfos1),
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")),
- CheckpointAfter = couch_util:to_integer(
- config:get("database_compaction", "checkpoint_after",
- BufferSize * 10)),
-
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
-
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, Db#db.name},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
-
-start_copy_compact(#db{}=Db) ->
- #db{name=Name, filepath=Filepath, options=Options} = Db,
- ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
-
- {ok, NewDb, DName, DFd, MFd, Retry} =
- open_compaction_files(Name, Filepath, Options),
- erlang:monitor(process, MFd),
-
- % This is a bit worrisome. init_db/4 will monitor the data fd
- % but it doesn't know about the meta fd. For now I'll maintain
- % that the data fd is the old normal fd and meta fd is special
- % and hope everything works out for the best.
- unlink(DFd),
-
- NewDb1 = copy_purge_info(Db, NewDb),
- NewDb2 = copy_compact(Db, NewDb1, Retry),
- NewDb3 = sort_meta_data(NewDb2),
- NewDb4 = commit_compaction_data(NewDb3),
- NewDb5 = copy_meta_data(NewDb4),
- NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
- close_db(NewDb6),
-
- ok = couch_file:close(MFd),
- gen_server:cast(Db#db.main_pid, {compact_done, DName}).
-
-
-open_compaction_files(DbName, DbFilePath, Options) ->
- DataFile = DbFilePath ++ ".compact.data",
- MetaFile = DbFilePath ++ ".compact.meta",
- {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
- case {DataHdr, MetaHdr} of
- {#comp_header{}=A, #comp_header{}=A} ->
- DbHeader = A#comp_header.db_header,
- Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- {#db_header{}, _} ->
- ok = reset_compaction_file(MetaFd, #db_header{}),
- Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
- _ ->
- Header = #db_header{},
- ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, nil}
- end.
-
-
-open_compaction_file(FilePath) ->
- case couch_file:open(FilePath) of
- {ok, Fd} ->
- case couch_file:read_header(Fd) of
- {ok, Header} -> {ok, Fd, Header};
- no_valid_header -> {ok, Fd, nil}
- end;
- {error, enoent} ->
- {ok, Fd} = couch_file:open(FilePath, [create]),
- {ok, Fd, nil}
- end.
-
-
-reset_compaction_file(Fd, Header) ->
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, Header).
-
-
-copy_purge_info(OldDb, NewDb) ->
- OldHdr = OldDb#db.header,
- NewHdr = NewDb#db.header,
- if OldHdr#db_header.purge_seq > 0 ->
- {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
- Opts = [{compression, NewDb#db.compression}],
- {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
- NewDb#db{
- header=NewHdr#db_header{
- purge_seq=OldHdr#db_header.purge_seq,
- purged_docs=Ptr
- }
- };
- true ->
- NewDb
- end.
-
-
-commit_compaction_data(#db{}=Db) ->
- % Compaction needs to write headers to both the data file
- % and the meta file so if we need to restart we can pick
- % back up from where we left off.
- commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
- commit_compaction_data(Db, Db#db.fd).
-
-
-commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
- % Mostly copied from commit_data/2 but I have to
- % replace the logic to commit and fsync to a specific
- % fd instead of the Filepath stuff that commit_data/2
- % does.
- DataState = OldHeader#db_header.id_tree_state,
- MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
- MetaState = couch_emsort:get_state(Db0#db.id_tree),
- Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
- Header = db_to_header(Db1, OldHeader),
- CompHeader = #comp_header{
- db_header = Header,
- meta_state = MetaState
- },
- ok = couch_file:sync(Fd),
- ok = couch_file:write_header(Fd, CompHeader),
- Db2 = Db1#db{
- waiting_delayed_commit=nil,
- header=Header,
- committed_update_seq=Db1#db.update_seq
- },
- bind_emsort(Db2, MetaFd, MetaState).
-
-
-bind_emsort(Db, Fd, nil) ->
- {ok, Ems} = couch_emsort:open(Fd),
- Db#db{id_tree=Ems};
-bind_emsort(Db, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
- Db#db{id_tree=Ems}.
-
-
-bind_id_tree(Db, Fd, State) ->
- {ok, IdBtree} = couch_btree:open(State, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- Db#db{id_tree=IdBtree}.
-
-
-sort_meta_data(Db0) ->
- {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
- Db0#db{id_tree=Ems}.
-
-
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
- Src = Db#db.id_tree,
- DstState = Header#db_header.id_tree_state,
- {ok, IdTree0} = couch_btree:open(DstState, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- {ok, Iter} = couch_emsort:iter(Src),
- Acc0 = #merge_st{
- id_tree=IdTree0,
- seq_tree=Db#db.seq_tree,
- rem_seqs=[],
- infos=[]
- },
- Acc = merge_docids(Iter, Acc0),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
- {ok, SeqTree} = couch_btree:add_remove(
- Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
- ),
- Db#db{id_tree=IdTree, seq_tree=SeqTree}.
-
-
-merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
- #merge_st{
- id_tree=IdTree0,
- seq_tree=SeqTree0,
- rem_seqs=RemSeqs
- } = Acc,
- {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
- {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
- Acc1 = Acc#merge_st{
- id_tree=IdTree1,
- seq_tree=SeqTree1,
- rem_seqs=[],
- infos=[]
- },
- merge_docids(Iter, Acc1);
-merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
- case next_info(Iter, Curr, []) of
- {NextIter, NewCurr, FDI, Seqs} ->
- Acc1 = Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = NewCurr
- },
- merge_docids(NextIter, Acc1);
- {finished, FDI, Seqs} ->
- Acc#merge_st{
- infos = [FDI | Acc#merge_st.infos],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = undefined
- };
- empty ->
- Acc
- end.
-
-
-next_info(Iter, undefined, []) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, Seq}, FDI}, NextIter} ->
- next_info(NextIter, {Id, Seq, FDI}, []);
- finished ->
- empty
- end;
-next_info(Iter, {Id, Seq, FDI}, Seqs) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, NSeq}, NFDI}, NextIter} ->
- next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
- {ok, {{NId, NSeq}, NFDI}, NextIter} ->
- {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
- finished ->
- {finished, FDI, Seqs}
- end.
-
-
-update_compact_task(NumChanges) ->
- [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
- Changes2 = Changes + NumChanges,
- Progress = case Total of
- 0 ->
- 0;
- _ ->
- (Changes2 * 100) div Total
- end,
- couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
-
-
-make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
- Body = case couch_compress:is_compressed(Body0, Comp) of
- true ->
- Body0;
- false ->
- % pre 1.2 database file format
- couch_compress:compress(Body0, Comp)
- end,
- Atts = case couch_compress:is_compressed(Atts0, Comp) of
- true ->
- Atts0;
- false ->
- couch_compress:compress(Atts0, Comp)
- end,
- SummaryBin = ?term_to_bin({Body, Atts}),
- couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
deleted file mode 100644
index 6f2ca9b..0000000
--- a/src/couch/src/couch_doc.erl
+++ /dev/null
@@ -1,784 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_doc).
-
--export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
--export([att_foldl/3,range_att_foldl/5,att_foldl_decode/3,get_validate_doc_fun/1]).
--export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
--export([validate_docid/1]).
--export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
--export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
--export([abort_multi_part_stream/1, restart_open_doc_revs/3]).
--export([to_path/1]).
--export([mp_parse_doc/2]).
--export([with_ejson_body/1]).
-
--include_lib("couch/include/couch_db.hrl").
-
--spec to_path(#doc{}) -> path().
-to_path(#doc{revs={Start, RevIds}}=Doc) ->
- [Branch] = to_branch(Doc, lists:reverse(RevIds)),
- {Start - length(RevIds) + 1, Branch}.
-
--spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
-to_branch(Doc, [RevId]) ->
- [{RevId, Doc, []}];
-to_branch(Doc, [RevId | Rest]) ->
- [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].
-
-% helpers used by to_json_obj
-to_json_rev(0, []) ->
- [];
-to_json_rev(Start, [FirstRevId|_]) ->
- [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}].
-
-to_json_body(true, {Body}) ->
- Body ++ [{<<"_deleted">>, true}];
-to_json_body(false, {Body}) ->
- Body.
-
-to_json_revisions(Options, Start, RevIds) ->
- case lists:member(revs, Options) of
- false -> [];
- true ->
- [{<<"_revisions">>, {[{<<"start">>, Start},
- {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}]
- end.
-
-revid_to_str(RevId) when size(RevId) =:= 16 ->
- ?l2b(couch_util:to_hex(RevId));
-revid_to_str(RevId) ->
- RevId.
-
-rev_to_str({Pos, RevId}) ->
- ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
-
-
-revs_to_strs([]) ->
- [];
-revs_to_strs([{Pos, RevId}| Rest]) ->
- [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
-
-to_json_meta(Meta) ->
- lists:map(
- fun({revs_info, Start, RevsInfo}) ->
- {JsonRevsInfo, _Pos} = lists:mapfoldl(
- fun({RevId, Status}, PosAcc) ->
- JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})},
- {<<"status">>, ?l2b(atom_to_list(Status))}]},
- {JsonObj, PosAcc - 1}
- end, Start, RevsInfo),
- {<<"_revs_info">>, JsonRevsInfo};
- ({local_seq, Seq}) ->
- {<<"_local_seq">>, Seq};
- ({conflicts, Conflicts}) ->
- {<<"_conflicts">>, revs_to_strs(Conflicts)};
- ({deleted_conflicts, DConflicts}) ->
- {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
- end, Meta).
-
-to_json_attachments(Attachments, Options) ->
- to_json_attachments(
- Attachments,
- lists:member(attachments, Options),
- lists:member(follows, Options),
- lists:member(att_encoding_info, Options)
- ).
-
-to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) ->
- [];
-to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) ->
- AttProps = lists:map(
- fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
- {Att#att.name, {[
- {<<"content_type">>, Att#att.type},
- {<<"revpos">>, Att#att.revpos}] ++
- case Att#att.md5 of
- <<>> ->
- [];
- Md5 ->
- EncodedMd5 = base64:encode(Md5),
- [{<<"digest">>, <<"md5-",EncodedMd5/binary>>}]
- end ++
- if not OutputData orelse Att#att.data == stub ->
- [{<<"length">>, DiskLen}, {<<"stub">>, true}];
- true ->
- if DataToFollow ->
- [{<<"length">>, DiskLen}, {<<"follows">>, true}];
- true ->
- AttData = case Enc of
- gzip ->
- zlib:gunzip(att_to_bin(Att));
- identity ->
- att_to_bin(Att)
- end,
- [{<<"data">>, base64:encode(AttData)}]
- end
- end ++
- case {ShowEncInfo, Enc} of
- {false, _} ->
- [];
- {true, identity} ->
- [];
- {true, _} ->
- [
- {<<"encoding">>, couch_util:to_binary(Enc)},
- {<<"encoded_length">>, AttLen}
- ]
- end
- }}
- end, Atts),
- [{<<"_attachments">>, {AttProps}}].
-
-to_json_obj(Doc, Options) ->
- doc_to_json_obj(with_ejson_body(Doc), Options).
-
-doc_to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
- meta=Meta}=Doc,Options)->
- {[{<<"_id">>, Id}]
- ++ to_json_rev(Start, RevIds)
- ++ to_json_body(Del, Body)
- ++ to_json_revisions(Options, Start, RevIds)
- ++ to_json_meta(Meta)
- ++ to_json_attachments(Doc#doc.atts, Options)
- }.
-
-from_json_obj({Props}) ->
- transfer_fields(Props, #doc{body=[]});
-
-from_json_obj(_Other) ->
- throw({bad_request, "Document must be a JSON object"}).
-
-parse_revid(RevId) when size(RevId) =:= 32 ->
- RevInt = erlang:list_to_integer(?b2l(RevId), 16),
- <<RevInt:128>>;
-parse_revid(RevId) when length(RevId) =:= 32 ->
- RevInt = erlang:list_to_integer(RevId, 16),
- <<RevInt:128>>;
-parse_revid(RevId) when is_binary(RevId) ->
- RevId;
-parse_revid(RevId) when is_list(RevId) ->
- ?l2b(RevId).
-
-
-parse_rev(Rev) when is_binary(Rev) ->
- parse_rev(?b2l(Rev));
-parse_rev(Rev) when is_list(Rev) ->
- SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev),
- case SplitRev of
- {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)};
- _Else -> throw({bad_request, <<"Invalid rev format">>})
- end;
-parse_rev(_BadRev) ->
- throw({bad_request, <<"Invalid rev format">>}).
-
-parse_revs([]) ->
- [];
-parse_revs([Rev | Rest]) ->
- [parse_rev(Rev) | parse_revs(Rest)].
-
-
-validate_docid(<<"">>) ->
- throw({bad_request, <<"Document id must not be empty">>});
-validate_docid(Id) when is_binary(Id) ->
- case couch_util:validate_utf8(Id) of
- false -> throw({bad_request, <<"Document id must be valid UTF-8">>});
- true -> ok
- end,
- case Id of
- <<"_design/", _/binary>> -> ok;
- <<"_local/", _/binary>> -> ok;
- <<"_", _/binary>> ->
- throw({bad_request, <<"Only reserved document ids may start with underscore.">>});
- _Else -> ok
- end;
-validate_docid(Id) ->
- ?LOG_DEBUG("Document id is not a string: ~p", [Id]),
- throw({bad_request, <<"Document id must be a string">>}).
-
-transfer_fields([], #doc{body=Fields}=Doc) ->
- % convert fields back to json object
- Doc#doc{body={lists:reverse(Fields)}};
-
-transfer_fields([{<<"_id">>, Id} | Rest], Doc) ->
- validate_docid(Id),
- transfer_fields(Rest, Doc#doc{id=Id});
-
-transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) ->
- {Pos, RevId} = parse_rev(Rev),
- transfer_fields(Rest,
- Doc#doc{revs={Pos, [RevId]}});
-
-transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) ->
- % we already got the rev from the _revisions
- transfer_fields(Rest,Doc);
-
-transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
- Atts = lists:map(fun({Name, {BinProps}}) ->
- Md5 = case couch_util:get_value(<<"digest">>, BinProps) of
- <<"md5-",EncodedMd5/binary>> ->
- base64:decode(EncodedMd5);
- _ ->
- <<>>
- end,
- case couch_util:get_value(<<"stub">>, BinProps) of
- true ->
- Type = couch_util:get_value(<<"content_type">>, BinProps),
- RevPos = couch_util:get_value(<<"revpos">>, BinProps, nil),
- DiskLen = couch_util:get_value(<<"length">>, BinProps),
- {Enc, EncLen} = att_encoding_info(BinProps),
- #att{name=Name, data=stub, type=Type, att_len=EncLen,
- disk_len=DiskLen, encoding=Enc, revpos=RevPos, md5=Md5};
- _ ->
- Type = couch_util:get_value(<<"content_type">>, BinProps,
- ?DEFAULT_ATTACHMENT_CONTENT_TYPE),
- RevPos = couch_util:get_value(<<"revpos">>, BinProps, 0),
- case couch_util:get_value(<<"follows">>, BinProps) of
- true ->
- DiskLen = couch_util:get_value(<<"length">>, BinProps),
- {Enc, EncLen} = att_encoding_info(BinProps),
- #att{name=Name, data=follows, type=Type, encoding=Enc,
- att_len=EncLen, disk_len=DiskLen, revpos=RevPos, md5=Md5};
- _ ->
- Value = couch_util:get_value(<<"data">>, BinProps),
- Bin = base64:decode(Value),
- LenBin = size(Bin),
- #att{name=Name, data=Bin, type=Type, att_len=LenBin,
- disk_len=LenBin, revpos=RevPos}
- end
- end
- end, JsonBins),
- transfer_fields(Rest, Doc#doc{atts=Atts});
-
-transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
- RevIds = couch_util:get_value(<<"ids">>, Props),
- Start = couch_util:get_value(<<"start">>, Props),
- if not is_integer(Start) ->
- throw({doc_validation, "_revisions.start isn't an integer."});
- not is_list(RevIds) ->
- throw({doc_validation, "_revisions.ids isn't a array."});
- true ->
- ok
- end,
- [throw({doc_validation, "RevId isn't a string"}) ||
- RevId <- RevIds, not is_binary(RevId)],
- RevIds2 = [parse_revid(RevId) || RevId <- RevIds],
- transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}});
-
-transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when is_boolean(B) ->
- transfer_fields(Rest, Doc#doc{deleted=B});
-
-% ignored fields
-transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) ->
- transfer_fields(Rest, Doc);
-transfer_fields([{<<"_local_seq">>, _} | Rest], Doc) ->
- transfer_fields(Rest, Doc);
-transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) ->
- transfer_fields(Rest, Doc);
-transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) ->
- transfer_fields(Rest, Doc);
-
-% special fields for replication documents
-transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
- #doc{body=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
-transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
- #doc{body=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
-transfer_fields([{<<"_replication_id">>, _} = Field | Rest],
- #doc{body=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
-transfer_fields([{<<"_replication_stats">>, _} = Field | Rest],
- #doc{body=Fields} = Doc) ->
- transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
-
-% unknown special field
-transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
- throw({doc_validation,
- ?l2b(io_lib:format("Bad special document member: _~s", [Name]))});
-
-transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
- transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
-
-att_encoding_info(BinProps) ->
- DiskLen = couch_util:get_value(<<"length">>, BinProps),
- case couch_util:get_value(<<"encoding">>, BinProps) of
- undefined ->
- {identity, DiskLen};
- Enc ->
- EncodedLen = couch_util:get_value(<<"encoded_length">>, BinProps, DiskLen),
- {list_to_existing_atom(?b2l(Enc)), EncodedLen}
- end.
-
-to_doc_info(FullDocInfo) ->
- {DocInfo, _Path} = to_doc_info_path(FullDocInfo),
- DocInfo.
-
-max_seq(Tree, UpdateSeq) ->
- FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) ->
- case Value of
- {_Deleted, _DiskPos, OldTreeSeq} ->
- % Older versions didn't track data sizes.
- erlang:max(MaxOldSeq, OldTreeSeq);
- {_Deleted, _DiskPos, OldTreeSeq, _Size} -> % necessary clause?
- % Older versions didn't store #leaf records.
- erlang:max(MaxOldSeq, OldTreeSeq);
- #leaf{seq=OldTreeSeq} ->
- erlang:max(MaxOldSeq, OldTreeSeq);
- _ ->
- MaxOldSeq
- end
- end,
- couch_key_tree:fold(FoldFun, UpdateSeq, Tree).
-
-to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree,update_seq=FDISeq}) ->
- RevInfosAndPath = [
- {#rev_info{
- deleted = Leaf#leaf.deleted,
- body_sp = Leaf#leaf.ptr,
- seq = Leaf#leaf.seq,
- rev = {Pos, RevId}
- }, Path} || {Leaf, {Pos, [RevId | _]} = Path} <-
- couch_key_tree:get_all_leafs(Tree)
- ],
- SortedRevInfosAndPath = lists:sort(
- fun({#rev_info{deleted=DeletedA,rev=RevA}, _PathA},
- {#rev_info{deleted=DeletedB,rev=RevB}, _PathB}) ->
- % sort descending by {not deleted, rev}
- {not DeletedA, RevA} > {not DeletedB, RevB}
- end, RevInfosAndPath),
- [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath,
- RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath],
- {#doc_info{id=Id, high_seq=max_seq(Tree, FDISeq), revs=RevInfos}, WinPath}.
-
-
-
-
-att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) ->
- Fun(Bin, Acc);
-att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
- couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
-att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) ->
- fold_streamed_data(DataFun, Len, Fun, Acc);
-att_foldl(#att{data={follows, Parser, Ref}}=Att, Fun, Acc) ->
- ParserRef = erlang:monitor(process, Parser),
- DataFun = fun() ->
- Parser ! {get_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- couch_doc:restart_open_doc_revs(Parser, Ref, NewRef);
- {bytes, Ref, Bytes} ->
- Bytes;
- {'DOWN', ParserRef, _, _, Reason} ->
- throw({mp_parser_died, Reason})
- end
- end,
- try
- att_foldl(Att#att{data=DataFun}, Fun, Acc)
- after
- erlang:demonitor(ParserRef, [flush])
- end.
-
-range_att_foldl(#att{data={Fd,Sp}}, From, To, Fun, Acc) ->
- couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
-
-att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
- couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc);
-att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) ->
- fold_streamed_data(Fun2, Len, Fun, Acc).
-
-att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
- Bin;
-att_to_bin(#att{data=Iolist}) when is_list(Iolist) ->
- iolist_to_binary(Iolist);
-att_to_bin(#att{data={_Fd,_Sp}}=Att) ->
- iolist_to_binary(
- lists:reverse(att_foldl(
- Att,
- fun(Bin,Acc) -> [Bin|Acc] end,
- []
- ))
- );
-att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)->
- iolist_to_binary(
- lists:reverse(fold_streamed_data(
- DataFun,
- Len,
- fun(Data, Acc) -> [Data | Acc] end,
- []
- ))
- ).
-
-get_validate_doc_fun({Props}) ->
- get_validate_doc_fun(couch_doc:from_json_obj({Props}));
-get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
- case couch_util:get_value(<<"validate_doc_update">>, Props) of
- undefined ->
- nil;
- _Else ->
- fun(EditDoc, DiskDoc, Ctx, SecObj) ->
- couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj)
- end
- end.
-
-
-has_stubs(#doc{atts=Atts}) ->
- has_stubs(Atts);
-has_stubs([]) ->
- false;
-has_stubs([#att{data=stub}|_]) ->
- true;
-has_stubs([_Att|Rest]) ->
- has_stubs(Rest).
-
-merge_stubs(#doc{id = Id}, nil) ->
- throw({missing_stub, <<"Previous revision missing for document ", Id/binary>>});
-merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
- BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
- MergedBins = lists:map(
- fun(#att{name=Name, data=stub, revpos=StubRevPos}) ->
- case dict:find(Name, BinDict) of
- {ok, #att{revpos=DiskRevPos}=DiskAtt}
- when DiskRevPos == StubRevPos orelse StubRevPos == nil ->
- DiskAtt;
- _ ->
- throw({missing_stub,
- <<"id:", Id/binary, ", name:", Name/binary>>})
- end;
- (Att) ->
- Att
- end, MemBins),
- StubsDoc#doc{atts= MergedBins}.
-
-fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
- Acc;
-fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
- Bin = RcvFun(),
- ResultAcc = Fun(Bin, Acc),
- fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-
-len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) ->
- AttsSize = lists:foldl(fun(Att, AccAttsSize) ->
- #att{
- data=Data,
- name=Name,
- att_len=AttLen,
- disk_len=DiskLen,
- type=Type,
- encoding=Encoding
- } = Att,
- case Data of
- stub ->
- AccAttsSize;
- _ ->
- AccAttsSize +
- 4 + % "\r\n\r\n"
- case SendEncodedAtts of
- true ->
- % header
- length(integer_to_list(AttLen)) +
- AttLen;
- _ ->
- % header
- length(integer_to_list(DiskLen)) +
- DiskLen
- end +
- 4 + % "\r\n--"
- size(Boundary) +
-
- % attachment headers
- % (the length of the Content-Length has already been set)
- size(Name) +
- size(Type) +
- length("\r\nContent-Disposition: attachment; filename=\"\"") +
- length("\r\nContent-Type: ") +
- length("\r\nContent-Length: ") +
- case Encoding of
- identity ->
- 0;
- _ ->
- length(atom_to_list(Encoding)) +
- length("\r\nContent-Encoding: ")
- end
- end
- end, 0, Atts),
- if AttsSize == 0 ->
- {<<"application/json">>, iolist_size(JsonBytes)};
- true ->
- {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>,
- 2 + % "--"
- size(Boundary) +
- 36 + % "\r\ncontent-type: application/json\r\n\r\n"
- iolist_size(JsonBytes) +
- 4 + % "\r\n--"
- size(Boundary) +
- + AttsSize +
- 2 % "--"
- }
- end.
-
-doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
- SendEncodedAtts) ->
- case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of
- true ->
- WriteFun([<<"--", Boundary/binary,
- "\r\nContent-Type: application/json\r\n\r\n">>,
- JsonBytes, <<"\r\n--", Boundary/binary>>]),
- atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts);
- false ->
- WriteFun(JsonBytes)
- end.
-
-atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) ->
- WriteFun(<<"--">>);
-atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun,
- SendEncodedAtts) ->
- atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts);
-atts_to_mp([Att | RestAtts], Boundary, WriteFun,
- SendEncodedAtts) ->
- #att{
- name=Name,
- att_len=AttLen,
- disk_len=DiskLen,
- type=Type,
- encoding=Encoding
- } = Att,
-
- % write headers
- LengthBin = case SendEncodedAtts of
- true -> list_to_binary(integer_to_list(AttLen));
- false -> list_to_binary(integer_to_list(DiskLen))
- end,
- WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>),
- WriteFun(<<"\r\nContent-Type: ", Type/binary>>),
- WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>),
- case Encoding of
- identity ->
- ok;
- _ ->
- EncodingBin = atom_to_binary(Encoding, latin1),
- WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>)
- end,
-
- % write data
- WriteFun(<<"\r\n\r\n">>),
- AttFun = case SendEncodedAtts of
- false ->
- fun att_foldl_decode/3;
- true ->
- fun att_foldl/3
- end,
- AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok),
- WriteFun(<<"\r\n--", Boundary/binary>>),
- atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts).
-
-
-doc_from_multi_part_stream(ContentType, DataFun) ->
- doc_from_multi_part_stream(ContentType, DataFun, make_ref()).
-
-
-doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
- Parent = self(),
- NumMpWriters = num_mp_writers(),
- Parser = spawn_link(fun() ->
- ParentRef = erlang:monitor(process, Parent),
- put(mp_parent_ref, ParentRef),
- put(num_mp_writers, NumMpWriters),
- {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
- ContentType, DataFun,
- fun(Next) -> mp_parse_doc(Next, []) end),
- unlink(Parent)
- end),
- ParserRef = erlang:monitor(process, Parser),
- Parser ! {get_doc_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- restart_open_doc_revs(Parser, Ref, NewRef);
- {doc_bytes, Ref, DocBytes} ->
- Doc = from_json_obj(?JSON_DECODE(DocBytes)),
- % we'll send the Parser process ID to the remote nodes so they can
- % retrieve their own copies of the attachment data
- Atts2 = lists:map(
- fun(#att{data=follows}=A) ->
- A#att{data={follows, Parser, Ref}};
- (A) ->
- A
- end, Doc#doc.atts),
- WaitFun = fun() ->
- receive {'DOWN', ParserRef, _, _, _} -> ok end,
- erlang:put(mochiweb_request_recv, true)
- end,
- {ok, Doc#doc{atts=Atts2}, WaitFun, Parser}
- end.
-
-
-mp_parse_doc({headers, H}, []) ->
- case couch_util:get_value("content-type", H) of
- {"application/json", _} ->
- fun (Next) ->
- mp_parse_doc(Next, [])
- end
- end;
-mp_parse_doc({body, Bytes}, AccBytes) ->
- fun (Next) ->
- mp_parse_doc(Next, [Bytes | AccBytes])
- end;
-mp_parse_doc(body_end, AccBytes) ->
- receive {get_doc_bytes, Ref, From} ->
- From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
- end,
- fun(Next) ->
- mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []})
- end.
-
-mp_parse_atts({headers, _}, Acc) ->
- fun(Next) -> mp_parse_atts(Next, Acc) end;
-mp_parse_atts(body_end, Acc) ->
- fun(Next) -> mp_parse_atts(Next, Acc) end;
-mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) ->
- case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of
- abort_parsing ->
- fun(Next) -> mp_abort_parse_atts(Next, nil) end;
- NewAcc ->
- fun(Next) -> mp_parse_atts(Next, NewAcc) end
- end;
-mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
- N = num_mp_writers(),
- M = length(Counters),
- case (M == N) andalso Chunks == [] of
- true ->
- ok;
- false ->
- ParentRef = get(mp_parent_ref),
- receive
- abort_parsing ->
- ok;
- {get_bytes, Ref, From} ->
- C2 = orddict:update_counter(From, 1, Counters),
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
- mp_parse_atts(eof, NewAcc);
- {'DOWN', ParentRef, _, _, _} ->
- exit(mp_reader_coordinator_died)
- after 3600000 ->
- ok
- end
- end.
-
-mp_abort_parse_atts(eof, _) ->
- ok;
-mp_abort_parse_atts(_, _) ->
- fun(Next) -> mp_abort_parse_atts(Next, nil) end.
-
-maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
- receive {get_bytes, Ref, From} ->
- NewCounters = orddict:update_counter(From, 1, Counters),
- maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
- after 0 ->
- % reply to as many writers as possible
- NewWaiting = lists:filter(fun(Writer) ->
- WhichChunk = orddict:fetch(Writer, Counters),
- ListIndex = WhichChunk - Offset,
- if ListIndex =< length(Chunks) ->
- Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
- false;
- true ->
- true
- end
- end, Waiting),
-
- % check if we can drop a chunk from the head of the list
- case Counters of
- [] ->
- SmallestIndex = 0;
- _ ->
- SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
- end,
- Size = length(Counters),
- N = num_mp_writers(),
- if Size == N andalso SmallestIndex == (Offset+1) ->
- NewChunks = tl(Chunks),
- NewOffset = Offset+1;
- true ->
- NewChunks = Chunks,
- NewOffset = Offset
- end,
-
- % we should wait for a writer if no one has written the last chunk
- LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
- if LargestIndex >= (Offset + length(Chunks)) ->
- % someone has written all possible chunks, keep moving
- {Ref, NewChunks, NewOffset, Counters, NewWaiting};
- true ->
- ParentRef = get(mp_parent_ref),
- receive
- abort_parsing ->
- abort_parsing;
- {'DOWN', ParentRef, _, _, _} ->
- exit(mp_reader_coordinator_died);
- {get_bytes, Ref, X} ->
- C2 = orddict:update_counter(X, 1, Counters),
- maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
- end
- end
- end.
-
-
-num_mp_writers() ->
- case erlang:get(mp_att_writers) of
- undefined -> 1;
- Count -> Count
- end.
-
-
-abort_multi_part_stream(Parser) ->
- MonRef = erlang:monitor(process, Parser),
- Parser ! abort_parsing,
- receive
- {'DOWN', MonRef, _, _, _} -> ok
- after 60000 ->
- % One minute is quite on purpose for this timeout. We
- % want to try and read data to keep the socket open
- % when possible but we also don't want to just make
- % this a super long timeout because people have to
- % wait this long to see if they just had an error
- % like a validate_doc_update failure.
- throw(multi_part_abort_timeout)
- end.
-
-
-restart_open_doc_revs(Parser, Ref, NewRef) ->
- unlink(Parser),
- exit(Parser, kill),
- flush_parser_messages(Ref),
- erlang:error({restart_open_doc_revs, NewRef}).
-
-
-flush_parser_messages(Ref) ->
- receive
- {headers, Ref, _} ->
- flush_parser_messages(Ref);
- {body_bytes, Ref, _} ->
- flush_parser_messages(Ref);
- {body_done, Ref} ->
- flush_parser_messages(Ref);
- {done, Ref} ->
- flush_parser_messages(Ref)
- after 0 ->
- ok
- end.
-
-
-with_ejson_body(#doc{body = Body} = Doc) when is_binary(Body) ->
- Doc#doc{body = couch_compress:decompress(Body)};
-with_ejson_body(#doc{body = {_}} = Doc) ->
- Doc.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_drv.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_drv.erl b/src/couch/src/couch_drv.erl
deleted file mode 100644
index 7fe119a..0000000
--- a/src/couch/src/couch_drv.erl
+++ /dev/null
@@ -1,62 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_drv).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-init([]) ->
- LibDir = util_driver_dir(),
- case erl_ddll:load(LibDir, "couch_icu_driver") of
- ok ->
- {ok, nil};
- {error, already_loaded} ->
- ?LOG_INFO("~p reloading couch_icu_driver", [?MODULE]),
- ok = erl_ddll:reload(LibDir, "couch_icu_driver"),
- {ok, nil};
- {error, Error} ->
- {stop, erl_ddll:format_error(Error)}
- end.
-
-handle_call(_Request, _From, State) ->
- {reply, ok, State}.
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
-
- {ok, State}.
-
-
-% private API
-util_driver_dir() ->
- case config:get("couchdb", "util_driver_dir", null) of
- null ->
- couch_util:priv_dir();
- LibDir0 ->
- LibDir0
- end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_ejson_compare.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl
deleted file mode 100644
index 7b000fc..0000000
--- a/src/couch/src/couch_ejson_compare.erl
+++ /dev/null
@@ -1,113 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_ejson_compare).
-
--export([less/2, less_json_ids/2, less_json/2]).
-
--on_load(init/0).
-
-
-init() ->
- LibDir = case config:get("couchdb", "util_driver_dir") of
- undefined ->
- filename:join(couch_util:priv_dir(), "lib");
- LibDir0 ->
- LibDir0
- end,
- NumScheds = erlang:system_info(schedulers),
- (catch erlang:load_nif(filename:join([LibDir, ?MODULE]), NumScheds)),
- case erlang:system_info(otp_release) of
- "R13B03" -> true;
- _ -> ok
- end.
-
-
-less(A, B) ->
- try
- less_nif(A, B)
- catch
- error:badarg ->
- % Maybe the EJSON structure is too deep, fallback to Erlang land.
- less_erl(A, B)
- end.
-
-less_json_ids({JsonA, IdA}, {JsonB, IdB}) ->
- case less(JsonA, JsonB) of
- 0 ->
- IdA < IdB;
- Result ->
- Result < 0
- end.
-
-less_json(A,B) ->
- less(A, B) < 0.
-
-
-less_nif(A, B) ->
- less_erl(A, B).
-
-
-less_erl(A,A) -> 0;
-
-less_erl(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B);
-less_erl(A,_) when is_atom(A) -> -1;
-less_erl(_,B) when is_atom(B) -> 1;
-
-less_erl(A,B) when is_number(A), is_number(B) -> A - B;
-less_erl(A,_) when is_number(A) -> -1;
-less_erl(_,B) when is_number(B) -> 1;
-
-less_erl(A,B) when is_binary(A), is_binary(B) -> couch_util:collate(A,B);
-less_erl(A,_) when is_binary(A) -> -1;
-less_erl(_,B) when is_binary(B) -> 1;
-
-less_erl(A,B) when is_list(A), is_list(B) -> less_list(A,B);
-less_erl(A,_) when is_list(A) -> -1;
-less_erl(_,B) when is_list(B) -> 1;
-
-less_erl({A},{B}) when is_list(A), is_list(B) -> less_props(A,B);
-less_erl({A},_) when is_list(A) -> -1;
-less_erl(_,{B}) when is_list(B) -> 1.
-
-atom_sort(null) -> 1;
-atom_sort(false) -> 2;
-atom_sort(true) -> 3.
-
-less_props([], [_|_]) ->
- -1;
-less_props(_, []) ->
- 1;
-less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
- case couch_util:collate(AKey, BKey) of
- 0 ->
- case less_erl(AValue, BValue) of
- 0 ->
- less_props(RestA, RestB);
- Result ->
- Result
- end;
- Result ->
- Result
- end.
-
-less_list([], [_|_]) ->
- -1;
-less_list(_, []) ->
- 1;
-less_list([A|RestA], [B|RestB]) ->
- case less_erl(A,B) of
- 0 ->
- less_list(RestA, RestB);
- Result ->
- Result
- end.