You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/06 18:40:06 UTC
[24/50] [abbrv] inital move to rebar compilation
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
new file mode 100644
index 0000000..af7578e
--- /dev/null
+++ b/src/couch_db_updater.erl
@@ -0,0 +1,1035 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater).
+-behaviour(gen_server).
+
+-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([make_doc_summary/2]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+
+init({MainPid, DbName, Filepath, Fd, Options}) ->
+ process_flag(trap_exit, true),
+ case lists:member(create, Options) of
+ true ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, Header),
+ % delete any old compaction files that might be hanging around
+ RootDir = couch_config:get("couchdb", "database_dir", "."),
+ couch_file:delete(RootDir, Filepath ++ ".compact");
+ false ->
+ case couch_file:read_header(Fd) of
+ {ok, Header} ->
+ ok;
+ no_valid_header ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, Header),
+ % delete any old compaction files that might be hanging around
+ file:delete(Filepath ++ ".compact")
+ end
+ end,
+ ReaderFd = open_reader_fd(Filepath, Options),
+ Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
+ Db2 = refresh_validate_doc_funs(Db),
+ {ok, Db2#db{main_pid = MainPid}}.
+
+
+terminate(_Reason, Db) ->
+ ok = couch_file:close(Db#db.updater_fd),
+ ok = couch_file:close(Db#db.fd),
+ couch_util:shutdown_sync(Db#db.compactor_pid),
+ couch_util:shutdown_sync(Db#db.fd_ref_counter),
+ ok.
+
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db};
+handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
+ {reply, ok, Db}; % no data waiting, return ok immediately
+handle_call(full_commit, _From, Db) ->
+ {reply, ok, commit_data(Db)}; % commit the data and return ok
+handle_call(increment_update_seq, _From, Db) ->
+ Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db#db.name}),
+ {reply, {ok, Db2#db.update_seq}, Db2};
+
+handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
+ {ok, Ptr, _} = couch_file:append_term(
+ Db#db.updater_fd, NewSec, [{compression, Comp}]),
+ Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
+ update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {reply, ok, Db2};
+
+handle_call({set_revs_limit, Limit}, _From, Db) ->
+ Db2 = commit_data(Db#db{revs_limit=Limit,
+ update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {reply, ok, Db2};
+
+handle_call({purge_docs, _IdRevs}, _From,
+ #db{compactor_pid=Pid}=Db) when Pid /= nil ->
+ {reply, {error, purge_during_compaction}, Db};
+handle_call({purge_docs, IdRevs}, _From, Db) ->
+ #db{
+ updater_fd = Fd,
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ update_seq = LastSeq,
+ header = Header = #db_header{purge_seq=PurgeSeq},
+ compression = Comp
+ } = Db,
+ DocLookups = couch_btree:lookup(DocInfoByIdBTree,
+ [Id || {Id, _Revs} <- IdRevs]),
+
+ NewDocInfos = lists:zipwith(
+ fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []=_RemovedRevs} -> % no change
+ nil;
+ {NewTree, RemovedRevs} ->
+ {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+ end;
+ (_, not_found) ->
+ nil
+ end,
+ IdRevs, DocLookups),
+
+ SeqsToRemove = [Seq
+ || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
+
+ FullDocInfoToUpdate = [FullInfo
+ || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
+ <- NewDocInfos, Tree /= []],
+
+ IdRevsPurged = [{Id, Revs}
+ || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
+
+ {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
+ fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
+ Tree2 = couch_key_tree:map_leafs(
+ fun(_RevId, LeafVal) ->
+ IsDeleted = element(1, LeafVal),
+ BodyPointer = element(2, LeafVal),
+ {IsDeleted, BodyPointer, SeqAcc + 1}
+ end, Tree),
+ {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}),
+ SeqAcc + 1}
+ end, LastSeq, FullDocInfoToUpdate),
+
+ IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
+ <- NewDocInfos],
+
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
+ DocInfoToUpdate, SeqsToRemove),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
+ FullDocInfoToUpdate, IdsToRemove),
+ {ok, Pointer, _} = couch_file:append_term(
+ Fd, IdRevsPurged, [{compression, Comp}]),
+
+ Db2 = commit_data(
+ Db#db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ update_seq = NewSeq + 1,
+ header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
+
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db#db.name}),
+ {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2};
+handle_call(start_compact, _From, Db) ->
+ case Db#db.compactor_pid of
+ nil ->
+ ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
+ Pid = spawn_link(fun() -> start_copy_compact(Db) end),
+ Db2 = Db#db{compactor_pid=Pid},
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ {reply, {ok, Pid}, Db2};
+ _ ->
+ % compact currently running, this is a no-op
+ {reply, {ok, Db#db.compactor_pid}, Db}
+ end;
+handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
+ {reply, ok, Db};
+handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
+ unlink(Pid),
+ exit(Pid, kill),
+ RootDir = couch_config:get("couchdb", "database_dir", "."),
+ ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
+ {reply, ok, Db#db{compactor_pid = nil}};
+
+
+handle_call({compact_done, CompactFilepath}, _From, #db{filepath=Filepath}=Db) ->
+ {ok, NewFd} = couch_file:open(CompactFilepath),
+ ReaderFd = open_reader_fd(CompactFilepath, Db#db.options),
+ {ok, NewHeader} = couch_file:read_header(NewFd),
+ #db{update_seq=NewSeq} = NewDb =
+ init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader, Db#db.options),
+ unlink(NewFd),
+ case Db#db.update_seq == NewSeq of
+ true ->
+ % suck up all the local docs into memory and write them to the new db
+ {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
+
+ NewDb2 = commit_data(NewDb#db{
+ local_docs_btree = NewLocalBtree,
+ main_pid = Db#db.main_pid,
+ filepath = Filepath,
+ instance_start_time = Db#db.instance_start_time,
+ revs_limit = Db#db.revs_limit
+ }),
+
+ ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
+ [Filepath, CompactFilepath]),
+ RootDir = couch_config:get("couchdb", "database_dir", "."),
+ couch_file:delete(RootDir, Filepath),
+ ok = file:rename(CompactFilepath, Filepath),
+ close_db(Db),
+ NewDb3 = refresh_validate_doc_funs(NewDb2),
+ ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb3}, infinity),
+ couch_db_update_notifier:notify({compacted, NewDb3#db.name}),
+ ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
+ {reply, ok, NewDb3#db{compactor_pid=nil}};
+ false ->
+ ?LOG_INFO("Compaction file still behind main file "
+ "(update seq=~p. compact update seq=~p). Retrying.",
+ [Db#db.update_seq, NewSeq]),
+ close_db(NewDb),
+ {reply, {retry, Db}, Db}
+ end.
+
+
+handle_cast(Msg, #db{name = Name} = Db) ->
+ ?LOG_ERROR("Database `~s` updater received unexpected cast: ~p", [Name, Msg]),
+ {stop, Msg, Db}.
+
+
+handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
+ FullCommit}, Db) ->
+ GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
+ if NonRepDocs == [] ->
+ {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
+ [Client], MergeConflicts, FullCommit);
+ true ->
+ GroupedDocs3 = GroupedDocs2,
+ FullCommit2 = FullCommit,
+ Clients = [Client]
+ end,
+ NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
+ try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
+ FullCommit2) of
+ {ok, Db2, UpdatedDDocIds} ->
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ if Db2#db.update_seq /= Db#db.update_seq ->
+ couch_db_update_notifier:notify({updated, Db2#db.name});
+ true -> ok
+ end,
+ [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
+ lists:foreach(fun(DDocId) ->
+ couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}})
+ end, UpdatedDDocIds),
+ {noreply, Db2}
+ catch
+ throw: retry ->
+ [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
+ {noreply, Db}
+ end;
+handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
+ %no outstanding delayed commits, ignore
+ {noreply, Db};
+handle_info(delayed_commit, Db) ->
+ case commit_data(Db) of
+ Db ->
+ {noreply, Db};
+ Db2 ->
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {noreply, Db2}
+ end;
+handle_info({'EXIT', _Pid, normal}, Db) ->
+ {noreply, Db};
+handle_info({'EXIT', _Pid, Reason}, Db) ->
+ {stop, Reason, Db}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+merge_updates([], RestB, AccOutGroups) ->
+ lists:reverse(AccOutGroups, RestB);
+merge_updates(RestA, [], AccOutGroups) ->
+ lists:reverse(AccOutGroups, RestA);
+merge_updates([[{_, {#doc{id=IdA}, _}}|_]=GroupA | RestA],
+ [[{_, {#doc{id=IdB}, _}}|_]=GroupB | RestB], AccOutGroups) ->
+ if IdA == IdB ->
+ merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
+ IdA < IdB ->
+ merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]);
+ true ->
+ merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
+ end.
+
+collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
+ receive
+ % Only collect updates with the same MergeConflicts flag and without
+ % local docs. It's easier to just avoid multiple _local doc
+ % updaters than deal with their possible conflicts, and local docs
+ % writes are relatively rare. Can be optmized later if really needed.
+ {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
+ GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
+ || DocGroup <- GroupedDocs],
+ GroupedDocsAcc2 =
+ merge_updates(GroupedDocsAcc, GroupedDocs2, []),
+ collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
+ MergeConflicts, (FullCommit or FullCommit2))
+ after 0 ->
+ {GroupedDocsAcc, ClientsAcc, FullCommit}
+ end.
+
+
+btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) ->
+ {RevInfos, DeletedRevInfos} = lists:foldl(
+ fun(#rev_info{deleted = false, seq = Seq} = Ri, {Acc, AccDel}) ->
+ {[{Ri#rev_info.rev, Seq, Ri#rev_info.body_sp} | Acc], AccDel};
+ (#rev_info{deleted = true, seq = Seq} = Ri, {Acc, AccDel}) ->
+ {Acc, [{Ri#rev_info.rev, Seq, Ri#rev_info.body_sp} | AccDel]}
+ end,
+ {[], []}, Revs),
+ {KeySeq, {Id, lists:reverse(RevInfos), lists:reverse(DeletedRevInfos)}}.
+
+btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
+ #doc_info{
+ id = Id,
+ high_seq=KeySeq,
+ revs =
+ [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
+ {Rev, Seq, Bp} <- RevInfos] ++
+ [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
+ {Rev, Seq, Bp} <- DeletedRevInfos]}.
+
+btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
+ deleted=Deleted, rev_tree=Tree}) ->
+ DiskTree =
+ couch_key_tree:map(
+ fun(_RevId, ?REV_MISSING) ->
+ ?REV_MISSING;
+ (_RevId, RevValue) ->
+ IsDeleted = element(1, RevValue),
+ BodyPointer = element(2, RevValue),
+ UpdateSeq = element(3, RevValue),
+ Size = case tuple_size(RevValue) of
+ 4 ->
+ element(4, RevValue);
+ 3 ->
+ % pre 1.2 format, will be upgraded on compaction
+ nil
+ end,
+ {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq, Size}
+ end, Tree),
+ {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}.
+
+btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
+ {Tree, LeafsSize} =
+ couch_key_tree:mapfold(
+ fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}, leaf, _Acc) ->
+ % pre 1.2 format, will be upgraded on compaction
+ {{IsDeleted == 1, BodyPointer, UpdateSeq, nil}, nil};
+ (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, branch, Acc) ->
+ {{IsDeleted == 1, BodyPointer, UpdateSeq, nil}, Acc};
+ (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, leaf, Acc) ->
+ Acc2 = sum_leaf_sizes(Acc, Size),
+ {{IsDeleted == 1, BodyPointer, UpdateSeq, Size}, Acc2};
+ (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, branch, Acc) ->
+ {{IsDeleted == 1, BodyPointer, UpdateSeq, Size}, Acc};
+ (_RevId, ?REV_MISSING, _Type, Acc) ->
+ {?REV_MISSING, Acc}
+ end, 0, DiskTree),
+ #full_doc_info{
+ id = Id,
+ update_seq = HighSeq,
+ deleted = (Deleted == 1),
+ rev_tree = Tree,
+ leafs_size = LeafsSize
+ }.
+
+btree_by_id_reduce(reduce, FullDocInfos) ->
+ lists:foldl(
+ fun(Info, {NotDeleted, Deleted, Size}) ->
+ Size2 = sum_leaf_sizes(Size, Info#full_doc_info.leafs_size),
+ case Info#full_doc_info.deleted of
+ true ->
+ {NotDeleted, Deleted + 1, Size2};
+ false ->
+ {NotDeleted + 1, Deleted, Size2}
+ end
+ end,
+ {0, 0, 0}, FullDocInfos);
+btree_by_id_reduce(rereduce, Reds) ->
+ lists:foldl(
+ fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSize}) ->
+ % pre 1.2 format, will be upgraded on compaction
+ {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
+ ({NotDeleted, Deleted, Size}, {AccNotDeleted, AccDeleted, AccSize}) ->
+ AccSize2 = sum_leaf_sizes(AccSize, Size),
+ {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSize2}
+ end,
+ {0, 0, 0}, Reds).
+
+sum_leaf_sizes(nil, _) ->
+ nil;
+sum_leaf_sizes(_, nil) ->
+ nil;
+sum_leaf_sizes(Size1, Size2) ->
+ Size1 + Size2.
+
+btree_by_seq_reduce(reduce, DocInfos) ->
+ % count the number of documents
+ length(DocInfos);
+btree_by_seq_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
+ OldSz = tuple_size(Old),
+ NewValuesTail =
+ lists:sublist(tuple_to_list(New), OldSz + 1, tuple_size(New) - OldSz),
+ list_to_tuple(tuple_to_list(Old) ++ NewValuesTail);
+simple_upgrade_record(Old, _New) ->
+ Old.
+
+-define(OLD_DISK_VERSION_ERROR,
+ "Database files from versions smaller than 0.10.0 are no longer supported").
+
+init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
+ Header1 = simple_upgrade_record(Header0, #db_header{}),
+ Header =
+ case element(2, Header1) of
+ 1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+ 2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+ 3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+ 4 -> Header1#db_header{security_ptr = nil}; % 0.10 and pre 0.11
+ 5 -> Header1; % pre 1.2
+ ?LATEST_DISK_VERSION -> Header1;
+ _ -> throw({database_disk_version_error, "Incorrect disk header version"})
+ end,
+
+ {ok, FsyncOptions} = couch_util:parse_term(
+ couch_config:get("couchdb", "fsync_options",
+ "[before_header, after_header, on_file_open]")),
+
+ case lists:member(on_file_open, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ Compression = couch_compress:get_compression_method(),
+
+ {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
+ [{split, fun(X) -> btree_by_id_split(X) end},
+ {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
+ {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
+ {compression, Compression}]),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+ [{split, fun(X) -> btree_by_seq_split(X) end},
+ {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
+ {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
+ {compression, Compression}]),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
+ [{compression, Compression}]),
+ case Header#db_header.security_ptr of
+ nil ->
+ Security = [],
+ SecurityPtr = nil;
+ SecurityPtr ->
+ {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
+ end,
+ % convert start time tuple to microsecs and store as a binary string
+ {MegaSecs, Secs, MicroSecs} = now(),
+ StartTime = ?l2b(io_lib:format("~p",
+ [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
+ {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
+ #db{
+ update_pid=self(),
+ fd = ReaderFd,
+ updater_fd = Fd,
+ fd_ref_counter = RefCntr,
+ header=Header,
+ fulldocinfo_by_id_btree = IdBtree,
+ docinfo_by_seq_btree = SeqBtree,
+ local_docs_btree = LocalDocsBtree,
+ committed_update_seq = Header#db_header.update_seq,
+ update_seq = Header#db_header.update_seq,
+ name = DbName,
+ filepath = Filepath,
+ security = Security,
+ security_ptr = SecurityPtr,
+ instance_start_time = StartTime,
+ revs_limit = Header#db_header.revs_limit,
+ fsync_options = FsyncOptions,
+ options = Options,
+ compression = Compression,
+ before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
+ after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
+ }.
+
+open_reader_fd(Filepath, Options) ->
+ {ok, Fd} = case lists:member(sys_db, Options) of
+ true ->
+ couch_file:open(Filepath, [read_only, sys_db]);
+ false ->
+ couch_file:open(Filepath, [read_only])
+ end,
+ unlink(Fd),
+ Fd.
+
+close_db(#db{fd_ref_counter = RefCntr}) ->
+ couch_ref_counter:drop(RefCntr).
+
+
+refresh_validate_doc_funs(Db0) ->
+ Db = Db0#db{user_ctx = #user_ctx{roles=[<<"_admin">>]}},
+ DesignDocs = couch_db:get_design_docs(Db),
+ ProcessDocFuns = lists:flatmap(
+ fun(DesignDocInfo) ->
+ {ok, DesignDoc} = couch_db:open_doc_int(
+ Db, DesignDocInfo, [ejson_body]),
+ case couch_doc:get_validate_doc_fun(DesignDoc) of
+ nil -> [];
+ Fun -> [Fun]
+ end
+ end, DesignDocs),
+ Db0#db{validate_doc_funs=ProcessDocFuns}.
+
+% rev tree functions
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+ {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(#db{updater_fd = Fd} = Db,
+ [InfoUnflushed | RestUnflushed], AccFlushed) ->
+ #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
+ {Flushed, LeafsSize} = couch_key_tree:mapfold(
+ fun(_Rev, Value, Type, Acc) ->
+ case Value of
+ #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
+ % this node value is actually an unwritten document summary,
+ % write to disk.
+ % make sure the Fd in the written bins is the same Fd we are
+ % and convert bins, removing the FD.
+ % All bins should have been written to disk already.
+ case {AttsFd, Fd} of
+ {nil, _} ->
+ ok;
+ {SameFd, SameFd} ->
+ ok;
+ _ ->
+ % Fd where the attachments were written to is not the same
+ % as our Fd. This can happen when a database is being
+ % switched out during a compaction.
+ ?LOG_DEBUG("File where the attachments are written has"
+ " changed. Possibly retrying.", []),
+ throw(retry)
+ end,
+ {ok, NewSummaryPointer, SummarySize} =
+ couch_file:append_raw_chunk(Fd, Summary),
+ TotalSize = lists:foldl(
+ fun(#att{att_len = L}, A) -> A + L end,
+ SummarySize, Value#doc.atts),
+ NewValue = {IsDeleted, NewSummaryPointer, UpdateSeq, TotalSize},
+ case Type of
+ leaf ->
+ {NewValue, Acc + TotalSize};
+ branch ->
+ {NewValue, Acc}
+ end;
+ {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
+ {Value, Acc + LeafSize};
+ _ ->
+ {Value, Acc}
+ end
+ end, 0, Unflushed),
+ InfoFlushed = InfoUnflushed#full_doc_info{
+ rev_tree = Flushed,
+ leafs_size = LeafsSize
+ },
+ flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
+
+
+send_result(Client, Ref, NewResult) ->
+ % used to send a result to the client
+ catch(Client ! {result, self(), {Ref, NewResult}}).
+
+merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
+merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+ #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
+ = OldDocInfo,
+ {NewRevTree, _} = lists:foldl(
+ fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
+ if not MergeConflicts ->
+ case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
+ Limit) of
+ {_NewTree, conflicts} when (not OldDeleted) ->
+ send_result(Client, Ref, conflict),
+ {AccTree, OldDeleted};
+ {NewTree, conflicts} when PrevRevs /= [] ->
+ % Check to be sure if prev revision was specified, it's
+ % a leaf node in the tree
+ Leafs = couch_key_tree:get_all_leafs(AccTree),
+ IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
+ {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
+ end, Leafs),
+ if IsPrevLeaf ->
+ {NewTree, OldDeleted};
+ true ->
+ send_result(Client, Ref, conflict),
+ {AccTree, OldDeleted}
+ end;
+ {NewTree, no_conflicts} when AccTree == NewTree ->
+ % the tree didn't change at all
+ % meaning we are saving a rev that's already
+ % been editted again.
+ if (Pos == 1) and OldDeleted ->
+ % this means we are recreating a brand new document
+ % into a state that already existed before.
+ % put the rev into a subsequent edit of the deletion
+ #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
+ couch_doc:to_doc_info(OldDocInfo),
+ NewRevId = couch_db:new_revid(
+ NewDoc#doc{revs={OldPos, [OldRev]}}),
+ NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
+ {NewTree2, _} = couch_key_tree:merge(AccTree,
+ couch_doc:to_path(NewDoc2), Limit),
+ % we changed the rev id, this tells the caller we did
+ send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
+ {NewTree2, OldDeleted};
+ true ->
+ send_result(Client, Ref, conflict),
+ {AccTree, OldDeleted}
+ end;
+ {NewTree, _} ->
+ {NewTree, NewDoc#doc.deleted}
+ end;
+ true ->
+ {NewTree, _} = couch_key_tree:merge(AccTree,
+ couch_doc:to_path(NewDoc), Limit),
+ {NewTree, OldDeleted}
+ end
+ end,
+ {OldTree, OldDeleted0}, NewDocs),
+ if NewRevTree == OldTree ->
+ % nothing changed
+ merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ AccNewInfos, AccRemoveSeqs, AccSeq);
+ true ->
+ % we have updated the document, give it a new seq #
+ NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
+ RemoveSeqs = case OldSeq of
+ 0 -> AccRemoveSeqs;
+ _ -> [OldSeq | AccRemoveSeqs]
+ end,
+ merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
+ end.
+
+
+
+new_index_entries([], AccById, AccBySeq, AccDDocIds) ->
+ {AccById, AccBySeq, AccDDocIds};
+new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq, AccDDocIds) ->
+ #doc_info{revs=[#rev_info{deleted=Deleted}|_], id=Id} = DocInfo =
+ couch_doc:to_doc_info(FullDocInfo),
+ AccDDocIds2 = case Id of
+ <<?DESIGN_DOC_PREFIX, _/binary>> ->
+ [Id | AccDDocIds];
+ _ ->
+ AccDDocIds
+ end,
+ new_index_entries(RestInfos,
+ [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
+ [DocInfo|AccBySeq],
+ AccDDocIds2).
+
+
+stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
+ [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
+ #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+
+update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
+ #db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ update_seq = LastSeq,
+ revs_limit = RevsLimit
+ } = Db,
+ Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
+ % lookup up the old documents, if they exist.
+ OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+ OldDocInfos = lists:zipwith(
+ fun(_Id, {ok, FullDocInfo}) ->
+ FullDocInfo;
+ (Id, not_found) ->
+ #full_doc_info{id=Id}
+ end,
+ Ids, OldDocLookups),
+ % Merge the new docs into the revision trees.
+ {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
+ MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
+
+ % All documents are now ready to write.
+
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
+
+ % Write out the document summaries (the bodies are stored in the nodes of
+ % the trees, the attachments are already written to disk)
+ {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
+
+ {IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds} =
+ new_index_entries(FlushedFullDocInfos, [], [], []),
+
+ % and the indexes
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
+
+ Db3 = Db2#db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ update_seq = NewSeq},
+
+ % Check if we just updated any design documents, and update the validation
+ % funs if we did.
+ Db4 = case UpdatedDDocIds of
+ [] ->
+ Db3;
+ _ ->
+ refresh_validate_doc_funs(Db3)
+ end,
+
+ {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
+
+update_local_docs(Db, []) ->
+ {ok, Db};
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+ Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs],
+ OldDocLookups = couch_btree:lookup(Btree, Ids),
+ BtreeEntries = lists:zipwith(
+ fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, OldDocLookup) ->
+ case PrevRevs of
+ [RevStr|_] ->
+ PrevRev = list_to_integer(?b2l(RevStr));
+ [] ->
+ PrevRev = 0
+ end,
+ OldRev =
+ case OldDocLookup of
+ {ok, {_, {OldRev0, _}}} -> OldRev0;
+ not_found -> 0
+ end,
+ case OldRev == PrevRev of
+ true ->
+ case Delete of
+ false ->
+ send_result(Client, Ref, {ok,
+ {0, ?l2b(integer_to_list(PrevRev + 1))}}),
+ {update, {Id, {PrevRev + 1, Body}}};
+ true ->
+ send_result(Client, Ref,
+ {ok, {0, <<"0">>}}),
+ {remove, Id}
+ end;
+ false ->
+ send_result(Client, Ref, conflict),
+ ignore
+ end
+ end, Docs, OldDocLookups),
+
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
+
+ {ok, Btree2} =
+ couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+ {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+commit_data(Db) ->
+ commit_data(Db, false).
+
+db_to_header(Db, Header) ->
+ Header#db_header{
+ update_seq = Db#db.update_seq,
+ docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+ fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ security_ptr = Db#db.security_ptr,
+ revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
+ Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)};
+commit_data(Db, true) ->
+ Db;
+commit_data(Db, _) ->
+ #db{
+ updater_fd = Fd,
+ filepath = Filepath,
+ header = OldHeader,
+ fsync_options = FsyncOptions,
+ waiting_delayed_commit = Timer
+ } = Db,
+ if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
+ case db_to_header(Db, OldHeader) of
+ OldHeader ->
+ Db#db{waiting_delayed_commit=nil};
+ Header ->
+ case lists:member(before_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Filepath);
+ _ -> ok
+ end,
+
+ ok = couch_file:write_header(Fd, Header),
+
+ case lists:member(after_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Filepath);
+ _ -> ok
+ end,
+
+ Db#db{waiting_delayed_commit=nil,
+ header=Header,
+ committed_update_seq=Db#db.update_seq}
+ end.
+
+
+copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
+ BinInfos = case BinInfos0 of
+ _ when is_binary(BinInfos0) ->
+ couch_compress:decompress(BinInfos0);
+ _ when is_list(BinInfos0) ->
+ % pre 1.2 file format
+ BinInfos0
+ end,
+ % copy the bin values
+ NewBinInfos = lists:map(
+ fun({Name, Type, BinSp, AttLen, RevPos, Md5}) ->
+ % 010 UPGRADE CODE
+ {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
+ couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity};
+ ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) ->
+ {NewBinSp, AttLen, _, Md5, _IdentityMd5} =
+ couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ Enc = case Enc1 of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc1
+ end,
+ {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc}
+ end, BinInfos),
+ {BodyData, NewBinInfos}.
+
+copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
+ % COUCHDB-968, make sure we prune duplicates during compaction
+ InfoBySeq = lists:usort(fun(#doc_info{id=A}, #doc_info{id=B}) -> A =< B end,
+ InfoBySeq0),
+ Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+ LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+
+ NewFullDocInfos1 = lists:map(
+ fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+ Info#full_doc_info{rev_tree=couch_key_tree:map(
+ fun(_, _, branch) ->
+ ?REV_MISSING;
+ (_Rev, LeafVal, leaf) ->
+ IsDel = element(1, LeafVal),
+ Sp = element(2, LeafVal),
+ Seq = element(3, LeafVal),
+ {_Body, AttsInfo} = Summary = copy_doc_attachments(
+ Db, Sp, DestFd),
+ SummaryChunk = make_doc_summary(NewDb, Summary),
+ {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
+ DestFd, SummaryChunk),
+ TotalLeafSize = lists:foldl(
+ fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end,
+ SummarySize, AttsInfo),
+ {IsDel, Pos, Seq, TotalLeafSize}
+ end, RevTree)}
+ end, LookupResults),
+
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
+ NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
+ RemoveSeqs =
+ case Retry of
+ false ->
+ [];
+ true ->
+ % We are retrying a compaction, meaning the documents we are copying may
+ % already exist in our file and must be removed from the by_seq index.
+ Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids),
+ [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+ end,
+
+ {ok, DocInfoBTree} = couch_btree:add_remove(
+ NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
+ {ok, FullDocInfoBTree} = couch_btree:add_remove(
+ NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+ update_compact_task(length(NewFullDocInfos)),
+ NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
+ docinfo_by_seq_btree=DocInfoBTree}.
+
+
+
+copy_compact(Db, NewDb0, Retry) ->
+ FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+ Compression = couch_compress:get_compression_method(),
+ NewDb = NewDb0#db{fsync_options=FsyncOptions, compression=Compression},
+ TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+ BufferSize = list_to_integer(
+ couch_config:get("database_compaction", "doc_buffer_size", "524288")),
+ CheckpointAfter = couch_util:to_integer(
+ couch_config:get("database_compaction", "checkpoint_after",
+ BufferSize * 10)),
+
+ EnumBySeqFun =
+ fun(#doc_info{high_seq=Seq}=DocInfo, _Offset,
+ {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
+
+ AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
+ if AccUncopiedSize2 >= BufferSize ->
+ NewDb2 = copy_docs(
+ Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+ AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
+ if AccCopiedSize2 >= CheckpointAfter ->
+ {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], 0, 0}};
+ true ->
+ {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
+ end;
+ true ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
+ AccCopiedSize}}
+ end
+ end,
+
+ TaskProps0 = [
+ {type, database_compaction},
+ {database, Db#db.name},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ],
+ case Retry and couch_task_status:is_task_added() of
+ true ->
+ couch_task_status:update([
+ {retry, true},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ]);
+ false ->
+ couch_task_status:add_task(TaskProps0),
+ couch_task_status:set_update_frequency(500)
+ end,
+
+ {ok, _, {NewDb2, Uncopied, _, _}} =
+ couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
+ {NewDb, [], 0, 0},
+ [{start_key, NewDb#db.update_seq + 1}]),
+
+ NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+
+ % copy misc header values
+ if NewDb3#db.security /= Db#db.security ->
+ {ok, Ptr, _} = couch_file:append_term(
+ NewDb3#db.updater_fd, Db#db.security,
+ [{compression, NewDb3#db.compression}]),
+ NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
+ true ->
+ NewDb4 = NewDb3
+ end,
+
+ commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
+
+start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
+ CompactFile = Filepath ++ ".compact",
+ ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
+ case couch_file:open(CompactFile, [nologifmissing]) of
+ {ok, Fd} ->
+ Retry = true,
+ case couch_file:read_header(Fd) of
+ {ok, Header} ->
+ ok;
+ no_valid_header ->
+ ok = couch_file:write_header(Fd, Header=#db_header{})
+ end;
+ {error, enoent} ->
+ {ok, Fd} = couch_file:open(CompactFile, [create]),
+ Retry = false,
+ ok = couch_file:write_header(Fd, Header=#db_header{})
+ end,
+ ReaderFd = open_reader_fd(CompactFile, Db#db.options),
+ NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options),
+ NewDb2 = if PurgeSeq > 0 ->
+ {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+ {ok, Pointer, _} = couch_file:append_term(
+ Fd, PurgedIdsRevs, [{compression, NewDb#db.compression}]),
+ NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+ true ->
+ NewDb
+ end,
+ unlink(Fd),
+
+ NewDb3 = copy_compact(Db, NewDb2, Retry),
+ close_db(NewDb3),
+ case gen_server:call(
+ Db#db.update_pid, {compact_done, CompactFile}, infinity) of
+ ok ->
+ ok;
+ {retry, CurrentDb} ->
+ start_copy_compact(CurrentDb)
+ end.
+
+update_compact_task(NumChanges) ->
+ [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+ Changes2 = Changes + NumChanges,
+ Progress = case Total of
+ 0 ->
+ 0;
+ _ ->
+ (Changes2 * 100) div Total
+ end,
+ couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+
+make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
+ Body = case couch_compress:is_compressed(Body0, Comp) of
+ true ->
+ Body0;
+ false ->
+ % pre 1.2 database file format
+ couch_compress:compress(Body0, Comp)
+ end,
+ Atts = case couch_compress:is_compressed(Atts0, Comp) of
+ true ->
+ Atts0;
+ false ->
+ couch_compress:compress(Atts0, Comp)
+ end,
+ SummaryBin = ?term_to_bin({Body, Atts}),
+ couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couch_doc.erl b/src/couch_doc.erl
new file mode 100644
index 0000000..4047370
--- /dev/null
+++ b/src/couch_doc.erl
@@ -0,0 +1,650 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_doc).
+
+-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
+-export([att_foldl/3,range_att_foldl/5,att_foldl_decode/3,get_validate_doc_fun/1]).
+-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
+-export([validate_docid/1]).
+-export([doc_from_multi_part_stream/2]).
+-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
+-export([abort_multi_part_stream/1]).
+-export([to_path/1]).
+-export([mp_parse_doc/2]).
+-export([with_ejson_body/1]).
+
+-include("couch_db.hrl").
+
+-spec to_path(#doc{}) -> path().
+to_path(#doc{revs={Start, RevIds}}=Doc) ->
+ [Branch] = to_branch(Doc, lists:reverse(RevIds)),
+ {Start - length(RevIds) + 1, Branch}.
+
+-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
+to_branch(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+to_branch(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].
+
+% helpers used by to_json_obj
+to_json_rev(0, []) ->
+ [];
+to_json_rev(Start, [FirstRevId|_]) ->
+ [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}].
+
+to_json_body(true, {Body}) ->
+ Body ++ [{<<"_deleted">>, true}];
+to_json_body(false, {Body}) ->
+ Body.
+
+to_json_revisions(Options, Start, RevIds) ->
+ case lists:member(revs, Options) of
+ false -> [];
+ true ->
+ [{<<"_revisions">>, {[{<<"start">>, Start},
+ {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}]
+ end.
+
+revid_to_str(RevId) when size(RevId) =:= 16 ->
+ ?l2b(couch_util:to_hex(RevId));
+revid_to_str(RevId) ->
+ RevId.
+
+rev_to_str({Pos, RevId}) ->
+ ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
+
+
+revs_to_strs([]) ->
+ [];
+revs_to_strs([{Pos, RevId}| Rest]) ->
+ [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
+
+to_json_meta(Meta) ->
+ lists:map(
+ fun({revs_info, Start, RevsInfo}) ->
+ {JsonRevsInfo, _Pos} = lists:mapfoldl(
+ fun({RevId, Status}, PosAcc) ->
+ JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})},
+ {<<"status">>, ?l2b(atom_to_list(Status))}]},
+ {JsonObj, PosAcc - 1}
+ end, Start, RevsInfo),
+ {<<"_revs_info">>, JsonRevsInfo};
+ ({local_seq, Seq}) ->
+ {<<"_local_seq">>, Seq};
+ ({conflicts, Conflicts}) ->
+ {<<"_conflicts">>, revs_to_strs(Conflicts)};
+ ({deleted_conflicts, DConflicts}) ->
+ {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
+ end, Meta).
+
+to_json_attachments(Attachments, Options) ->
+ to_json_attachments(
+ Attachments,
+ lists:member(attachments, Options),
+ lists:member(follows, Options),
+ lists:member(att_encoding_info, Options)
+ ).
+
+to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) ->
+ [];
+to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) ->
+ AttProps = lists:map(
+ fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
+ {Att#att.name, {[
+ {<<"content_type">>, Att#att.type},
+ {<<"revpos">>, Att#att.revpos}] ++
+ case Att#att.md5 of
+ <<>> ->
+ [];
+ Md5 ->
+ EncodedMd5 = base64:encode(Md5),
+ [{<<"digest">>, <<"md5-",EncodedMd5/binary>>}]
+ end ++
+ if not OutputData orelse Att#att.data == stub ->
+ [{<<"length">>, DiskLen}, {<<"stub">>, true}];
+ true ->
+ if DataToFollow ->
+ [{<<"length">>, DiskLen}, {<<"follows">>, true}];
+ true ->
+ AttData = case Enc of
+ gzip ->
+ zlib:gunzip(att_to_bin(Att));
+ identity ->
+ att_to_bin(Att)
+ end,
+ [{<<"data">>, base64:encode(AttData)}]
+ end
+ end ++
+ case {ShowEncInfo, Enc} of
+ {false, _} ->
+ [];
+ {true, identity} ->
+ [];
+ {true, _} ->
+ [
+ {<<"encoding">>, couch_util:to_binary(Enc)},
+ {<<"encoded_length">>, AttLen}
+ ]
+ end
+ }}
+ end, Atts),
+ [{<<"_attachments">>, {AttProps}}].
+
+to_json_obj(Doc, Options) ->
+ doc_to_json_obj(with_ejson_body(Doc), Options).
+
+doc_to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
+ meta=Meta}=Doc,Options)->
+ {[{<<"_id">>, Id}]
+ ++ to_json_rev(Start, RevIds)
+ ++ to_json_body(Del, Body)
+ ++ to_json_revisions(Options, Start, RevIds)
+ ++ to_json_meta(Meta)
+ ++ to_json_attachments(Doc#doc.atts, Options)
+ }.
+
+from_json_obj({Props}) ->
+ transfer_fields(Props, #doc{body=[]});
+
+from_json_obj(_Other) ->
+ throw({bad_request, "Document must be a JSON object"}).
+
+parse_revid(RevId) when size(RevId) =:= 32 ->
+ RevInt = erlang:list_to_integer(?b2l(RevId), 16),
+ <<RevInt:128>>;
+parse_revid(RevId) when length(RevId) =:= 32 ->
+ RevInt = erlang:list_to_integer(RevId, 16),
+ <<RevInt:128>>;
+parse_revid(RevId) when is_binary(RevId) ->
+ RevId;
+parse_revid(RevId) when is_list(RevId) ->
+ ?l2b(RevId).
+
+
+parse_rev(Rev) when is_binary(Rev) ->
+ parse_rev(?b2l(Rev));
+parse_rev(Rev) when is_list(Rev) ->
+ SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev),
+ case SplitRev of
+ {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)};
+ _Else -> throw({bad_request, <<"Invalid rev format">>})
+ end;
+parse_rev(_BadRev) ->
+ throw({bad_request, <<"Invalid rev format">>}).
+
+parse_revs([]) ->
+ [];
+parse_revs([Rev | Rest]) ->
+ [parse_rev(Rev) | parse_revs(Rest)].
+
+
+validate_docid(<<"">>) ->
+ throw({bad_request, <<"Document id must not be empty">>});
+validate_docid(Id) when is_binary(Id) ->
+ case couch_util:validate_utf8(Id) of
+ false -> throw({bad_request, <<"Document id must be valid UTF-8">>});
+ true -> ok
+ end,
+ case Id of
+ <<"_design/", _/binary>> -> ok;
+ <<"_local/", _/binary>> -> ok;
+ <<"_", _/binary>> ->
+ throw({bad_request, <<"Only reserved document ids may start with underscore.">>});
+ _Else -> ok
+ end;
+validate_docid(Id) ->
+ ?LOG_DEBUG("Document id is not a string: ~p", [Id]),
+ throw({bad_request, <<"Document id must be a string">>}).
+
+transfer_fields([], #doc{body=Fields}=Doc) ->
+ % convert fields back to json object
+ Doc#doc{body={lists:reverse(Fields)}};
+
+transfer_fields([{<<"_id">>, Id} | Rest], Doc) ->
+ validate_docid(Id),
+ transfer_fields(Rest, Doc#doc{id=Id});
+
+transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) ->
+ {Pos, RevId} = parse_rev(Rev),
+ transfer_fields(Rest,
+ Doc#doc{revs={Pos, [RevId]}});
+
+transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) ->
+ % we already got the rev from the _revisions
+ transfer_fields(Rest,Doc);
+
+transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
+ Atts = lists:map(fun({Name, {BinProps}}) ->
+ Md5 = case couch_util:get_value(<<"digest">>, BinProps) of
+ <<"md5-",EncodedMd5/binary>> ->
+ base64:decode(EncodedMd5);
+ _ ->
+ <<>>
+ end,
+ case couch_util:get_value(<<"stub">>, BinProps) of
+ true ->
+ Type = couch_util:get_value(<<"content_type">>, BinProps),
+ RevPos = couch_util:get_value(<<"revpos">>, BinProps, nil),
+ DiskLen = couch_util:get_value(<<"length">>, BinProps),
+ {Enc, EncLen} = att_encoding_info(BinProps),
+ #att{name=Name, data=stub, type=Type, att_len=EncLen,
+ disk_len=DiskLen, encoding=Enc, revpos=RevPos, md5=Md5};
+ _ ->
+ Type = couch_util:get_value(<<"content_type">>, BinProps,
+ ?DEFAULT_ATTACHMENT_CONTENT_TYPE),
+ RevPos = couch_util:get_value(<<"revpos">>, BinProps, 0),
+ case couch_util:get_value(<<"follows">>, BinProps) of
+ true ->
+ DiskLen = couch_util:get_value(<<"length">>, BinProps),
+ {Enc, EncLen} = att_encoding_info(BinProps),
+ #att{name=Name, data=follows, type=Type, encoding=Enc,
+ att_len=EncLen, disk_len=DiskLen, revpos=RevPos, md5=Md5};
+ _ ->
+ Value = couch_util:get_value(<<"data">>, BinProps),
+ Bin = base64:decode(Value),
+ LenBin = size(Bin),
+ #att{name=Name, data=Bin, type=Type, att_len=LenBin,
+ disk_len=LenBin, revpos=RevPos}
+ end
+ end
+ end, JsonBins),
+ transfer_fields(Rest, Doc#doc{atts=Atts});
+
+transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
+ RevIds = couch_util:get_value(<<"ids">>, Props),
+ Start = couch_util:get_value(<<"start">>, Props),
+ if not is_integer(Start) ->
+ throw({doc_validation, "_revisions.start isn't an integer."});
+ not is_list(RevIds) ->
+ throw({doc_validation, "_revisions.ids isn't a array."});
+ true ->
+ ok
+ end,
+ [throw({doc_validation, "RevId isn't a string"}) ||
+ RevId <- RevIds, not is_binary(RevId)],
+ RevIds2 = [parse_revid(RevId) || RevId <- RevIds],
+ transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}});
+
+transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when is_boolean(B) ->
+ transfer_fields(Rest, Doc#doc{deleted=B});
+
+% ignored fields
+transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+transfer_fields([{<<"_local_seq">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+
+% special fields for replication documents
+transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_state_reason">>, _} = Field | Rest],
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_id">>, _} = Field | Rest],
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_stats">>, _} = Field | Rest],
+ #doc{body=Fields} = Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+
+% unknown special field
+transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
+ throw({doc_validation,
+ ?l2b(io_lib:format("Bad special document member: _~s", [Name]))});
+
+transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
+
+att_encoding_info(BinProps) ->
+ DiskLen = couch_util:get_value(<<"length">>, BinProps),
+ case couch_util:get_value(<<"encoding">>, BinProps) of
+ undefined ->
+ {identity, DiskLen};
+ Enc ->
+ EncodedLen = couch_util:get_value(<<"encoded_length">>, BinProps, DiskLen),
+ {list_to_existing_atom(?b2l(Enc)), EncodedLen}
+ end.
+
+to_doc_info(FullDocInfo) ->
+ {DocInfo, _Path} = to_doc_info_path(FullDocInfo),
+ DocInfo.
+
+max_seq(Tree, UpdateSeq) ->
+ FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) ->
+ case Value of
+ {_Deleted, _DiskPos, OldTreeSeq} ->
+ % Older versions didn't track data sizes.
+ erlang:max(MaxOldSeq, OldTreeSeq);
+ {_Deleted, _DiskPos, OldTreeSeq, _Size} ->
+ erlang:max(MaxOldSeq, OldTreeSeq);
+ _ ->
+ MaxOldSeq
+ end
+ end,
+ couch_key_tree:fold(FoldFun, UpdateSeq, Tree).
+
+to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree,update_seq=Seq}) ->
+ RevInfosAndPath = [
+ {#rev_info{
+ deleted = element(1, LeafVal),
+ body_sp = element(2, LeafVal),
+ seq = element(3, LeafVal),
+ rev = {Pos, RevId}
+ }, Path} || {LeafVal, {Pos, [RevId | _]} = Path} <-
+ couch_key_tree:get_all_leafs(Tree)
+ ],
+ SortedRevInfosAndPath = lists:sort(
+ fun({#rev_info{deleted=DeletedA,rev=RevA}, _PathA},
+ {#rev_info{deleted=DeletedB,rev=RevB}, _PathB}) ->
+ % sort descending by {not deleted, rev}
+ {not DeletedA, RevA} > {not DeletedB, RevB}
+ end, RevInfosAndPath),
+ [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath,
+ RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath],
+ {#doc_info{id=Id, high_seq=max_seq(Tree, Seq), revs=RevInfos}, WinPath}.
+
+
+
+
+att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) ->
+ Fun(Bin, Acc);
+att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
+ couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) ->
+ fold_streamed_data(DataFun, Len, Fun, Acc).
+
+range_att_foldl(#att{data={Fd,Sp}}, From, To, Fun, Acc) ->
+ couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
+
+att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
+ couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc);
+att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) ->
+ fold_streamed_data(Fun2, Len, Fun, Acc).
+
+att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
+ Bin;
+att_to_bin(#att{data=Iolist}) when is_list(Iolist) ->
+ iolist_to_binary(Iolist);
+att_to_bin(#att{data={_Fd,_Sp}}=Att) ->
+ iolist_to_binary(
+ lists:reverse(att_foldl(
+ Att,
+ fun(Bin,Acc) -> [Bin|Acc] end,
+ []
+ ))
+ );
+att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)->
+ iolist_to_binary(
+ lists:reverse(fold_streamed_data(
+ DataFun,
+ Len,
+ fun(Data, Acc) -> [Data | Acc] end,
+ []
+ ))
+ ).
+
+get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
+ case couch_util:get_value(<<"validate_doc_update">>, Props) of
+ undefined ->
+ nil;
+ _Else ->
+ fun(EditDoc, DiskDoc, Ctx, SecObj) ->
+ couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj)
+ end
+ end.
+
+
+has_stubs(#doc{atts=Atts}) ->
+ has_stubs(Atts);
+has_stubs([]) ->
+ false;
+has_stubs([#att{data=stub}|_]) ->
+ true;
+has_stubs([_Att|Rest]) ->
+ has_stubs(Rest).
+
+merge_stubs(#doc{id = Id}, nil) ->
+ throw({missing_stub, <<"Previous revision missing for document ", Id/binary>>});
+merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
+ BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
+ MergedBins = lists:map(
+ fun(#att{name=Name, data=stub, revpos=StubRevPos}) ->
+ case dict:find(Name, BinDict) of
+ {ok, #att{revpos=DiskRevPos}=DiskAtt}
+ when DiskRevPos == StubRevPos orelse StubRevPos == nil ->
+ DiskAtt;
+ _ ->
+ throw({missing_stub,
+ <<"id:", Id/binary, ", name:", Name/binary>>})
+ end;
+ (Att) ->
+ Att
+ end, MemBins),
+ StubsDoc#doc{atts= MergedBins}.
+
+fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
+ Acc;
+fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
+ Bin = RcvFun(),
+ ResultAcc = Fun(Bin, Acc),
+ fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
+
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) ->
+ AttsSize = lists:foldl(fun(Att, AccAttsSize) ->
+ #att{
+ data=Data,
+ name=Name,
+ att_len=AttLen,
+ disk_len=DiskLen,
+ type=Type,
+ encoding=Encoding
+ } = Att,
+ case Data of
+ stub ->
+ AccAttsSize;
+ _ ->
+ AccAttsSize +
+ 4 + % "\r\n\r\n"
+ case SendEncodedAtts of
+ true ->
+ % header
+ length(integer_to_list(AttLen)) +
+ AttLen;
+ _ ->
+ % header
+ length(integer_to_list(DiskLen)) +
+ DiskLen
+ end +
+ 4 + % "\r\n--"
+ size(Boundary) +
+
+ % attachment headers
+ % (the length of the Content-Length has already been set)
+ size(Name) +
+ size(Type) +
+ length("\r\nContent-Disposition: attachment; filename=\"\"") +
+ length("\r\nContent-Type: ") +
+ length("\r\nContent-Length: ") +
+ case Encoding of
+ identity ->
+ 0;
+ _ ->
+ length(atom_to_list(Encoding)) +
+ length("\r\nContent-Encoding: ")
+ end
+ end
+ end, 0, Atts),
+ if AttsSize == 0 ->
+ {<<"application/json">>, iolist_size(JsonBytes)};
+ true ->
+ {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>,
+ 2 + % "--"
+ size(Boundary) +
+ 36 + % "\r\ncontent-type: application/json\r\n\r\n"
+ iolist_size(JsonBytes) +
+ 4 + % "\r\n--"
+ size(Boundary) +
+ + AttsSize +
+ 2 % "--"
+ }
+ end.
+
+doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
+ SendEncodedAtts) ->
+ case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of
+ true ->
+ WriteFun([<<"--", Boundary/binary,
+ "\r\nContent-Type: application/json\r\n\r\n">>,
+ JsonBytes, <<"\r\n--", Boundary/binary>>]),
+ atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts);
+ false ->
+ WriteFun(JsonBytes)
+ end.
+
+atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) ->
+ WriteFun(<<"--">>);
+atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun,
+ SendEncodedAtts) ->
+ atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts);
+atts_to_mp([Att | RestAtts], Boundary, WriteFun,
+ SendEncodedAtts) ->
+ #att{
+ name=Name,
+ att_len=AttLen,
+ disk_len=DiskLen,
+ type=Type,
+ encoding=Encoding
+ } = Att,
+
+ % write headers
+ LengthBin = case SendEncodedAtts of
+ true -> list_to_binary(integer_to_list(AttLen));
+ false -> list_to_binary(integer_to_list(DiskLen))
+ end,
+ WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>),
+ WriteFun(<<"\r\nContent-Type: ", Type/binary>>),
+ WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>),
+ case Encoding of
+ identity ->
+ ok;
+ _ ->
+ EncodingBin = atom_to_binary(Encoding, latin1),
+ WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>)
+ end,
+
+ % write data
+ WriteFun(<<"\r\n\r\n">>),
+ AttFun = case SendEncodedAtts of
+ false ->
+ fun att_foldl_decode/3;
+ true ->
+ fun att_foldl/3
+ end,
+ AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok),
+ WriteFun(<<"\r\n--", Boundary/binary>>),
+ atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts).
+
+
+doc_from_multi_part_stream(ContentType, DataFun) ->
+ Parent = self(),
+ Parser = spawn_link(fun() ->
+ {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
+ ContentType, DataFun,
+ fun(Next) -> mp_parse_doc(Next, []) end),
+ unlink(Parent),
+ Parent ! {self(), finished}
+ end),
+ Ref = make_ref(),
+ Parser ! {get_doc_bytes, Ref, self()},
+ receive
+ {doc_bytes, Ref, DocBytes} ->
+ Doc = from_json_obj(?JSON_DECODE(DocBytes)),
+ % go through the attachments looking for 'follows' in the data,
+ % replace with function that reads the data from MIME stream.
+ ReadAttachmentDataFun = fun() ->
+ Parser ! {get_bytes, Ref, self()},
+ receive {bytes, Ref, Bytes} -> Bytes end
+ end,
+ Atts2 = lists:map(
+ fun(#att{data=follows}=A) ->
+ A#att{data=ReadAttachmentDataFun};
+ (A) ->
+ A
+ end, Doc#doc.atts),
+ WaitFun = fun() ->
+ receive {Parser, finished} -> ok end,
+ erlang:put(mochiweb_request_recv, true)
+ end,
+ {ok, Doc#doc{atts=Atts2}, WaitFun, Parser}
+ end.
+
+mp_parse_doc({headers, H}, []) ->
+ case couch_util:get_value("content-type", H) of
+ {"application/json", _} ->
+ fun (Next) ->
+ mp_parse_doc(Next, [])
+ end
+ end;
+mp_parse_doc({body, Bytes}, AccBytes) ->
+ fun (Next) ->
+ mp_parse_doc(Next, [Bytes | AccBytes])
+ end;
+mp_parse_doc(body_end, AccBytes) ->
+ receive {get_doc_bytes, Ref, From} ->
+ From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
+ end,
+ fun mp_parse_atts/1.
+
+mp_parse_atts(eof) ->
+ ok;
+mp_parse_atts({headers, _H}) ->
+ fun mp_parse_atts/1;
+mp_parse_atts({body, Bytes}) ->
+ receive {get_bytes, Ref, From} ->
+ From ! {bytes, Ref, Bytes}
+ end,
+ fun mp_parse_atts/1;
+mp_parse_atts(body_end) ->
+ fun mp_parse_atts/1.
+
+
+abort_multi_part_stream(Parser) ->
+ abort_multi_part_stream(Parser, erlang:monitor(process, Parser)).
+
+abort_multi_part_stream(Parser, MonRef) ->
+ case is_process_alive(Parser) of
+ true ->
+ Parser ! {get_bytes, nil, self()},
+ receive
+ {bytes, nil, _Bytes} ->
+ abort_multi_part_stream(Parser, MonRef);
+ {'DOWN', MonRef, _, _, _} ->
+ ok
+ end;
+ false ->
+ erlang:demonitor(MonRef, [flush])
+ end.
+
+
+with_ejson_body(#doc{body = Body} = Doc) when is_binary(Body) ->
+ Doc#doc{body = couch_compress:decompress(Body)};
+with_ejson_body(#doc{body = {_}} = Doc) ->
+ Doc.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_ejson_compare.erl
----------------------------------------------------------------------
diff --git a/src/couch_ejson_compare.erl b/src/couch_ejson_compare.erl
new file mode 100644
index 0000000..f46ec35
--- /dev/null
+++ b/src/couch_ejson_compare.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ejson_compare).
+
+-export([less/2, less_json_ids/2, less_json/2]).
+
+less_json_ids({JsonA, IdA}, {JsonB, IdB}) ->
+ case less(JsonA, JsonB) of
+ 0 ->
+ IdA < IdB;
+ Result ->
+ Result < 0
+ end.
+
+less_json(A,B) ->
+ less(A, B) < 0.
+
+less(A,A) -> 0;
+
+less(A,B) when is_atom(A), is_atom(B) -> atom_sort(A) - atom_sort(B);
+less(A,_) when is_atom(A) -> -1;
+less(_,B) when is_atom(B) -> 1;
+
+less(A,B) when is_number(A), is_number(B) -> A - B;
+less(A,_) when is_number(A) -> -1;
+less(_,B) when is_number(B) -> 1;
+
+less(A,B) when is_binary(A), is_binary(B) -> couch_collate:collate(A,B);
+less(A,_) when is_binary(A) -> -1;
+less(_,B) when is_binary(B) -> 1;
+
+less(A,B) when is_list(A), is_list(B) -> less_list(A,B);
+less(A,_) when is_list(A) -> -1;
+less(_,B) when is_list(B) -> 1;
+
+less({A},{B}) when is_list(A), is_list(B) -> less_props(A,B);
+less({A},_) when is_list(A) -> -1;
+less(_,{B}) when is_list(B) -> 1.
+
+atom_sort(null) -> 1;
+atom_sort(false) -> 2;
+atom_sort(true) -> 3.
+
+less_props([], [_|_]) ->
+ -1;
+less_props(_, []) ->
+ 1;
+less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
+ case couch_collate:collate(AKey, BKey) of
+ 0 ->
+ case less(AValue, BValue) of
+ 0 ->
+ less_props(RestA, RestB);
+ Result ->
+ Result
+ end;
+ Result ->
+ Result
+ end.
+
+less_list([], [_|_]) ->
+ -1;
+less_list(_, []) ->
+ 1;
+less_list([A|RestA], [B|RestB]) ->
+ case less(A,B) of
+ 0 ->
+ less_list(RestA, RestB);
+ Result ->
+ Result
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_event_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_sup.erl b/src/couch_event_sup.erl
new file mode 100644
index 0000000..07c4879
--- /dev/null
+++ b/src/couch_event_sup.erl
@@ -0,0 +1,73 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% The purpose of this module is to allow event handlers to particpate in Erlang
+%% supervisor trees. It provide a monitorable process that crashes if the event
+%% handler fails. The process, when shutdown, deregisters the event handler.
+
+-module(couch_event_sup).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+-export([start_link/3,start_link/4, stop/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
+
+%
+% Instead calling the
+% ok = gen_event:add_sup_handler(error_logger, my_log, Args)
+%
+% do this:
+% {ok, LinkedPid} = couch_event_sup:start_link(error_logger, my_log, Args)
+%
+% The benefit is the event is now part of the process tree, and can be
+% started, restarted and shutdown consistently like the rest of the server
+% components.
+%
+% And now if the "event" crashes, the supervisor is notified and can restart
+% the event handler.
+%
+% Use this form to named process:
+% {ok, LinkedPid} = couch_event_sup:start_link({local, my_log}, error_logger, my_log, Args)
+%
+
+start_link(EventMgr, EventHandler, Args) ->
+ gen_server:start_link(couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+start_link(ServerName, EventMgr, EventHandler, Args) ->
+ gen_server:start_link(ServerName, couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+init({EventMgr, EventHandler, Args}) ->
+ case gen_event:add_sup_handler(EventMgr, EventHandler, Args) of
+ ok ->
+ {ok, {EventMgr, EventHandler}};
+ {stop, Error} ->
+ {stop, Error}
+ end.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(_Whatever, _From, State) ->
+ {ok, State}.
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_info({gen_event_EXIT, _Handler, Reason}, State) ->
+ {stop, Reason, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_external_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_external_manager.erl b/src/couch_external_manager.erl
new file mode 100644
index 0000000..0c66ef8
--- /dev/null
+++ b/src/couch_external_manager.erl
@@ -0,0 +1,101 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_external_manager).
+-behaviour(gen_server).
+
+-export([start_link/0, execute/2, config_change/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, couch_external_manager},
+ couch_external_manager, [], []).
+
+execute(UrlName, JsonReq) ->
+ Pid = gen_server:call(couch_external_manager, {get, UrlName}),
+ case Pid of
+ {error, Reason} ->
+ Reason;
+ _ ->
+ couch_external_server:execute(Pid, JsonReq)
+ end.
+
+config_change("external", UrlName) ->
+ gen_server:call(couch_external_manager, {config, UrlName}).
+
+% gen_server API
+
+init([]) ->
+ process_flag(trap_exit, true),
+ Handlers = ets:new(couch_external_manager_handlers, [set, private]),
+ couch_config:register(fun ?MODULE:config_change/2),
+ {ok, Handlers}.
+
+terminate(_Reason, Handlers) ->
+ ets:foldl(fun({_UrlName, Pid}, nil) ->
+ couch_external_server:stop(Pid),
+ nil
+ end, nil, Handlers),
+ ok.
+
+handle_call({get, UrlName}, _From, Handlers) ->
+ case ets:lookup(Handlers, UrlName) of
+ [] ->
+ case couch_config:get("external", UrlName, nil) of
+ nil ->
+ Msg = lists:flatten(
+ io_lib:format("No server configured for ~p.", [UrlName])),
+ {reply, {error, {unknown_external_server, ?l2b(Msg)}}, Handlers};
+ Command ->
+ {ok, NewPid} = couch_external_server:start_link(UrlName, Command),
+ true = ets:insert(Handlers, {UrlName, NewPid}),
+ {reply, NewPid, Handlers}
+ end;
+ [{UrlName, Pid}] ->
+ {reply, Pid, Handlers}
+ end;
+handle_call({config, UrlName}, _From, Handlers) ->
+ % A newly added handler and a handler that had it's command
+ % changed are treated exactly the same.
+
+ % Shutdown the old handler.
+ case ets:lookup(Handlers, UrlName) of
+ [{UrlName, Pid}] ->
+ couch_external_server:stop(Pid);
+ [] ->
+ ok
+ end,
+ % Wait for next request to boot the handler.
+ {reply, ok, Handlers}.
+
+handle_cast(_Whatever, State) ->
+ {noreply, State}.
+
+handle_info({'EXIT', Pid, normal}, Handlers) ->
+ ?LOG_INFO("EXTERNAL: Server ~p terminated normally", [Pid]),
+ % The process terminated normally without us asking - Remove Pid from the
+ % handlers table so we don't attempt to reuse it
+ ets:match_delete(Handlers, {'_', Pid}),
+ {noreply, Handlers};
+
+handle_info({'EXIT', Pid, Reason}, Handlers) ->
+ ?LOG_INFO("EXTERNAL: Server ~p died. (reason: ~p)", [Pid, Reason]),
+ % Remove Pid from the handlers table so we don't try closing
+ % it a second time in terminate/2.
+ ets:match_delete(Handlers, {'_', Pid}),
+ {stop, normal, Handlers}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_external_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_external_server.erl b/src/couch_external_server.erl
new file mode 100644
index 0000000..b52c7ff
--- /dev/null
+++ b/src/couch_external_server.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_external_server).
+-behaviour(gen_server).
+
+-export([start_link/2, stop/1, execute/2]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+% External API
+
+start_link(Name, Command) ->
+ gen_server:start_link(couch_external_server, [Name, Command], []).
+
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+execute(Pid, JsonReq) ->
+ {json, Json} = gen_server:call(Pid, {execute, JsonReq}, infinity),
+ ?JSON_DECODE(Json).
+
+% Gen Server Handlers
+
+init([Name, Command]) ->
+ ?LOG_INFO("EXTERNAL: Starting process for: ~s", [Name]),
+ ?LOG_INFO("COMMAND: ~s", [Command]),
+ process_flag(trap_exit, true),
+ Timeout = list_to_integer(couch_config:get("couchdb", "os_process_timeout",
+ "5000")),
+ {ok, Pid} = couch_os_process:start_link(Command, [{timeout, Timeout}]),
+ couch_config:register(fun("couchdb", "os_process_timeout", NewTimeout) ->
+ couch_os_process:set_timeout(Pid, list_to_integer(NewTimeout))
+ end),
+ {ok, {Name, Command, Pid}}.
+
+terminate(_Reason, {_Name, _Command, Pid}) ->
+ couch_os_process:stop(Pid),
+ ok.
+
+handle_call({execute, JsonReq}, _From, {Name, Command, Pid}) ->
+ {reply, couch_os_process:prompt(Pid, JsonReq), {Name, Command, Pid}}.
+
+handle_info({'EXIT', _Pid, normal}, State) ->
+ {noreply, State};
+handle_info({'EXIT', Pid, Reason}, {Name, Command, Pid}) ->
+ ?LOG_INFO("EXTERNAL: Process for ~s exiting. (reason: ~w)", [Name, Reason]),
+ {stop, Reason, {Name, Command, Pid}}.
+
+handle_cast(stop, {Name, Command, Pid}) ->
+ ?LOG_INFO("EXTERNAL: Shutting down ~s", [Name]),
+ exit(Pid, normal),
+ {stop, normal, {Name, Command, Pid}};
+handle_cast(_Whatever, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+