You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/11 09:07:09 UTC

[07/41] inital move to rebar compilation

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
new file mode 100644
index 0000000..af7578e
--- /dev/null
+++ b/src/couch_db_updater.erl
@@ -0,0 +1,1035 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater).
+-behaviour(gen_server).
+
+-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([make_doc_summary/2]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+
+init({MainPid, DbName, Filepath, Fd, Options}) ->
+    process_flag(trap_exit, true),
+    case lists:member(create, Options) of
+    true ->
+        % create a new header and writes it to the file
+        Header =  #db_header{},
+        ok = couch_file:write_header(Fd, Header),
+        % delete any old compaction files that might be hanging around
+        RootDir = couch_config:get("couchdb", "database_dir", "."),
+        couch_file:delete(RootDir, Filepath ++ ".compact");
+    false ->
+        case couch_file:read_header(Fd) of
+        {ok, Header} ->
+            ok;
+        no_valid_header ->
+            % create a new header and writes it to the file
+            Header =  #db_header{},
+            ok = couch_file:write_header(Fd, Header),
+            % delete any old compaction files that might be hanging around
+            file:delete(Filepath ++ ".compact")
+        end
+    end,
+    ReaderFd = open_reader_fd(Filepath, Options),
+    Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
+    Db2 = refresh_validate_doc_funs(Db),
+    {ok, Db2#db{main_pid = MainPid}}.
+
+
+terminate(_Reason, Db) ->
+    ok = couch_file:close(Db#db.updater_fd),
+    ok = couch_file:close(Db#db.fd),
+    couch_util:shutdown_sync(Db#db.compactor_pid),
+    couch_util:shutdown_sync(Db#db.fd_ref_counter),
+    ok.
+
+handle_call(get_db, _From, Db) ->
+    {reply, {ok, Db}, Db};
+handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
+    {reply, ok, Db}; % no data waiting, return ok immediately
+handle_call(full_commit, _From,  Db) ->
+    {reply, ok, commit_data(Db)}; % commit the data and return ok
+handle_call(increment_update_seq, _From, Db) ->
+    Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
+    ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+    couch_db_update_notifier:notify({updated, Db#db.name}),
+    {reply, {ok, Db2#db.update_seq}, Db2};
+
+handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
+    {ok, Ptr, _} = couch_file:append_term(
+        Db#db.updater_fd, NewSec, [{compression, Comp}]),
+    Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
+            update_seq=Db#db.update_seq+1}),
+    ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+    {reply, ok, Db2};
+
+handle_call({set_revs_limit, Limit}, _From, Db) ->
+    Db2 = commit_data(Db#db{revs_limit=Limit,
+            update_seq=Db#db.update_seq+1}),
+    ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+    {reply, ok, Db2};
+
+handle_call({purge_docs, _IdRevs}, _From,
+        #db{compactor_pid=Pid}=Db) when Pid /= nil ->
+    {reply, {error, purge_during_compaction}, Db};
+handle_call({purge_docs, IdRevs}, _From, Db) ->
+    #db{
+        updater_fd = Fd,
+        fulldocinfo_by_id_btree = DocInfoByIdBTree,
+        docinfo_by_seq_btree = DocInfoBySeqBTree,
+        update_seq = LastSeq,
+        header = Header = #db_header{purge_seq=PurgeSeq},
+        compression = Comp
+        } = Db,
+    DocLookups = couch_btree:lookup(DocInfoByIdBTree,
+            [Id || {Id, _Revs} <- IdRevs]),
+
+    NewDocInfos = lists:zipwith(
+        fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
+            case couch_key_tree:remove_leafs(Tree, Revs) of
+            {_, []=_RemovedRevs} -> % no change
+                nil;
+            {NewTree, RemovedRevs} ->
+                {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
+            end;
+        (_, not_found) ->
+            nil
+        end,
+        IdRevs, DocLookups),
+
+    SeqsToRemove = [Seq
+            || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
+
+    FullDocInfoToUpdate = [FullInfo
+            || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
+            <- NewDocInfos, Tree /= []],
+
+    IdRevsPurged = [{Id, Revs}
+            || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
+
+    {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
+        fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
+            Tree2 = couch_key_tree:map_leafs(
+                fun(_RevId, LeafVal) ->
+                    IsDeleted = element(1, LeafVal),
+                    BodyPointer = element(2, LeafVal),
+                    {IsDeleted, BodyPointer, SeqAcc + 1}
+                end, Tree),
+            {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}),
+                SeqAcc + 1}
+        end, LastSeq, FullDocInfoToUpdate),
+
+    IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
+            <- NewDocInfos],
+
+    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
+            DocInfoToUpdate, SeqsToRemove),
+    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
+            FullDocInfoToUpdate, IdsToRemove),
+    {ok, Pointer, _} = couch_file:append_term(
+            Fd, IdRevsPurged, [{compression, Comp}]),
+
+    Db2 = commit_data(
+        Db#db{
+            fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+            docinfo_by_seq_btree = DocInfoBySeqBTree2,
+            update_seq = NewSeq + 1,
+            header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
+
+    ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+    couch_db_update_notifier:notify({updated, Db#db.name}),
+    {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2};
+handle_call(start_compact, _From, Db) ->
+    case Db#db.compactor_pid of
+    nil ->
+        ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
+        Pid = spawn_link(fun() -> start_copy_compact(Db) end),
+        Db2 = Db#db{compactor_pid=Pid},
+        ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+        {reply, {ok, Pid}, Db2};
+    _ ->
+        % compact currently running, this is a no-op
+        {reply, {ok, Db#db.compactor_pid}, Db}
+    end;
+handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
+    {reply, ok, Db};
+handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
+    unlink(Pid),
+    exit(Pid, kill),
+    RootDir = couch_config:get("couchdb", "database_dir", "."),
+    ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
+    {reply, ok, Db#db{compactor_pid = nil}};
+
+
+handle_call({compact_done, CompactFilepath}, _From, #db{filepath=Filepath}=Db) ->
+    {ok, NewFd} = couch_file:open(CompactFilepath),
+    ReaderFd = open_reader_fd(CompactFilepath, Db#db.options),
+    {ok, NewHeader} = couch_file:read_header(NewFd),
+    #db{update_seq=NewSeq} = NewDb =
+        init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader, Db#db.options),
+    unlink(NewFd),
+    case Db#db.update_seq == NewSeq of
+    true ->
+        % suck up all the local docs into memory and write them to the new db
+        {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+                fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+        {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
+
+        NewDb2 = commit_data(NewDb#db{
+            local_docs_btree = NewLocalBtree,
+            main_pid = Db#db.main_pid,
+            filepath = Filepath,
+            instance_start_time = Db#db.instance_start_time,
+            revs_limit = Db#db.revs_limit
+        }),
+
+        ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
+                [Filepath, CompactFilepath]),
+        RootDir = couch_config:get("couchdb", "database_dir", "."),
+        couch_file:delete(RootDir, Filepath),
+        ok = file:rename(CompactFilepath, Filepath),
+        close_db(Db),
+        NewDb3 = refresh_validate_doc_funs(NewDb2),
+        ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb3}, infinity),
+        couch_db_update_notifier:notify({compacted, NewDb3#db.name}),
+        ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
+        {reply, ok, NewDb3#db{compactor_pid=nil}};
+    false ->
+        ?LOG_INFO("Compaction file still behind main file "
+            "(update seq=~p. compact update seq=~p). Retrying.",
+            [Db#db.update_seq, NewSeq]),
+        close_db(NewDb),
+        {reply, {retry, Db}, Db}
+    end.
+
+
+handle_cast(Msg, #db{name = Name} = Db) ->
+    ?LOG_ERROR("Database `~s` updater received unexpected cast: ~p", [Name, Msg]),
+    {stop, Msg, Db}.
+
+
+handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
+        FullCommit}, Db) ->
+    GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
+    if NonRepDocs == [] ->
+        {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
+                [Client], MergeConflicts, FullCommit);
+    true ->
+        GroupedDocs3 = GroupedDocs2,
+        FullCommit2 = FullCommit,
+        Clients = [Client]
+    end,
+    NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
+    try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
+                FullCommit2) of
+    {ok, Db2, UpdatedDDocIds} ->
+        ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+        if Db2#db.update_seq /= Db#db.update_seq ->
+            couch_db_update_notifier:notify({updated, Db2#db.name});
+        true -> ok
+        end,
+        [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
+        lists:foreach(fun(DDocId) ->
+            couch_db_update_notifier:notify({ddoc_updated, {Db#db.name, DDocId}})
+        end, UpdatedDDocIds),
+        {noreply, Db2}
+    catch
+        throw: retry ->
+            [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
+            {noreply, Db}
+    end;
+handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
+    %no outstanding delayed commits, ignore
+    {noreply, Db};
+handle_info(delayed_commit, Db) ->
+    case commit_data(Db) of
+        Db ->
+            {noreply, Db};
+        Db2 ->
+            ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+            {noreply, Db2}
+    end;
+handle_info({'EXIT', _Pid, normal}, Db) ->
+    {noreply, Db};
+handle_info({'EXIT', _Pid, Reason}, Db) ->
+    {stop, Reason, Db}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+merge_updates([], RestB, AccOutGroups) ->
+    lists:reverse(AccOutGroups, RestB);
+merge_updates(RestA, [], AccOutGroups) ->
+    lists:reverse(AccOutGroups, RestA);
+merge_updates([[{_, {#doc{id=IdA}, _}}|_]=GroupA | RestA],
+        [[{_, {#doc{id=IdB}, _}}|_]=GroupB | RestB], AccOutGroups) ->
+    if IdA == IdB ->
+        merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
+    IdA < IdB ->
+        merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]);
+    true ->
+        merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
+    end.
+
+collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
+    receive
+        % Only collect updates with the same MergeConflicts flag and without
+        % local docs. It's easier to just avoid multiple _local doc
+        % updaters than deal with their possible conflicts, and local docs
+        % writes are relatively rare. Can be optmized later if really needed.
+        {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
+            GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
+                    || DocGroup <- GroupedDocs],
+            GroupedDocsAcc2 =
+                merge_updates(GroupedDocsAcc, GroupedDocs2, []),
+            collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
+                    MergeConflicts, (FullCommit or FullCommit2))
+    after 0 ->
+        {GroupedDocsAcc, ClientsAcc, FullCommit}
+    end.
+
+
+btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) ->
+    {RevInfos, DeletedRevInfos} = lists:foldl(
+        fun(#rev_info{deleted = false, seq = Seq} = Ri, {Acc, AccDel}) ->
+                {[{Ri#rev_info.rev, Seq, Ri#rev_info.body_sp} | Acc], AccDel};
+            (#rev_info{deleted = true, seq = Seq} = Ri, {Acc, AccDel}) ->
+                {Acc, [{Ri#rev_info.rev, Seq, Ri#rev_info.body_sp} | AccDel]}
+        end,
+        {[], []}, Revs),
+    {KeySeq, {Id, lists:reverse(RevInfos), lists:reverse(DeletedRevInfos)}}.
+
+btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
+    #doc_info{
+        id = Id,
+        high_seq=KeySeq,
+        revs =
+            [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
+                {Rev, Seq, Bp} <- RevInfos] ++
+            [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
+                {Rev, Seq, Bp} <- DeletedRevInfos]}.
+
+btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
+        deleted=Deleted, rev_tree=Tree}) ->
+    DiskTree =
+    couch_key_tree:map(
+        fun(_RevId, ?REV_MISSING) ->
+            ?REV_MISSING;
+        (_RevId, RevValue) ->
+            IsDeleted = element(1, RevValue),
+            BodyPointer = element(2, RevValue),
+            UpdateSeq = element(3, RevValue),
+            Size = case tuple_size(RevValue) of
+            4 ->
+                element(4, RevValue);
+            3 ->
+                % pre 1.2 format, will be upgraded on compaction
+                nil
+            end,
+            {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq, Size}
+        end, Tree),
+    {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}.
+
+btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
+    {Tree, LeafsSize} =
+    couch_key_tree:mapfold(
+        fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}, leaf, _Acc) ->
+            % pre 1.2 format, will be upgraded on compaction
+            {{IsDeleted == 1, BodyPointer, UpdateSeq, nil}, nil};
+        (_RevId, {IsDeleted, BodyPointer, UpdateSeq}, branch, Acc) ->
+            {{IsDeleted == 1, BodyPointer, UpdateSeq, nil}, Acc};
+        (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, leaf, Acc) ->
+            Acc2 = sum_leaf_sizes(Acc, Size),
+            {{IsDeleted == 1, BodyPointer, UpdateSeq, Size}, Acc2};
+        (_RevId, {IsDeleted, BodyPointer, UpdateSeq, Size}, branch, Acc) ->
+            {{IsDeleted == 1, BodyPointer, UpdateSeq, Size}, Acc};
+        (_RevId, ?REV_MISSING, _Type, Acc) ->
+            {?REV_MISSING, Acc}
+        end, 0, DiskTree),
+    #full_doc_info{
+        id = Id,
+        update_seq = HighSeq,
+        deleted = (Deleted == 1),
+        rev_tree = Tree,
+        leafs_size = LeafsSize
+    }.
+
+btree_by_id_reduce(reduce, FullDocInfos) ->
+    lists:foldl(
+        fun(Info, {NotDeleted, Deleted, Size}) ->
+            Size2 = sum_leaf_sizes(Size, Info#full_doc_info.leafs_size),
+            case Info#full_doc_info.deleted of
+            true ->
+                {NotDeleted, Deleted + 1, Size2};
+            false ->
+                {NotDeleted + 1, Deleted, Size2}
+            end
+        end,
+        {0, 0, 0}, FullDocInfos);
+btree_by_id_reduce(rereduce, Reds) ->
+    lists:foldl(
+        fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSize}) ->
+            % pre 1.2 format, will be upgraded on compaction
+            {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
+        ({NotDeleted, Deleted, Size}, {AccNotDeleted, AccDeleted, AccSize}) ->
+            AccSize2 = sum_leaf_sizes(AccSize, Size),
+            {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSize2}
+        end,
+        {0, 0, 0}, Reds).
+
+sum_leaf_sizes(nil, _) ->
+    nil;
+sum_leaf_sizes(_, nil) ->
+    nil;
+sum_leaf_sizes(Size1, Size2) ->
+    Size1 + Size2.
+
+btree_by_seq_reduce(reduce, DocInfos) ->
+    % count the number of documents
+    length(DocInfos);
+btree_by_seq_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+
+simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
+    OldSz = tuple_size(Old),
+    NewValuesTail =
+        lists:sublist(tuple_to_list(New), OldSz + 1, tuple_size(New) - OldSz),
+    list_to_tuple(tuple_to_list(Old) ++ NewValuesTail);
+simple_upgrade_record(Old, _New) ->
+    Old.
+
+-define(OLD_DISK_VERSION_ERROR,
+    "Database files from versions smaller than 0.10.0 are no longer supported").
+
+init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
+    Header1 = simple_upgrade_record(Header0, #db_header{}),
+    Header =
+    case element(2, Header1) of
+    1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+    2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+    3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+    4 -> Header1#db_header{security_ptr = nil}; % 0.10 and pre 0.11
+    5 -> Header1; % pre 1.2
+    ?LATEST_DISK_VERSION -> Header1;
+    _ -> throw({database_disk_version_error, "Incorrect disk header version"})
+    end,
+
+    {ok, FsyncOptions} = couch_util:parse_term(
+            couch_config:get("couchdb", "fsync_options",
+                    "[before_header, after_header, on_file_open]")),
+
+    case lists:member(on_file_open, FsyncOptions) of
+    true -> ok = couch_file:sync(Fd);
+    _ -> ok
+    end,
+
+    Compression = couch_compress:get_compression_method(),
+
+    {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
+        [{split, fun(X) -> btree_by_id_split(X) end},
+        {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
+        {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
+        {compression, Compression}]),
+    {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+            [{split, fun(X) -> btree_by_seq_split(X) end},
+            {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
+            {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
+            {compression, Compression}]),
+    {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
+        [{compression, Compression}]),
+    case Header#db_header.security_ptr of
+    nil ->
+        Security = [],
+        SecurityPtr = nil;
+    SecurityPtr ->
+        {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
+    end,
+    % convert start time tuple to microsecs and store as a binary string
+    {MegaSecs, Secs, MicroSecs} = now(),
+    StartTime = ?l2b(io_lib:format("~p",
+            [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
+    {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
+    #db{
+        update_pid=self(),
+        fd = ReaderFd,
+        updater_fd = Fd,
+        fd_ref_counter = RefCntr,
+        header=Header,
+        fulldocinfo_by_id_btree = IdBtree,
+        docinfo_by_seq_btree = SeqBtree,
+        local_docs_btree = LocalDocsBtree,
+        committed_update_seq = Header#db_header.update_seq,
+        update_seq = Header#db_header.update_seq,
+        name = DbName,
+        filepath = Filepath,
+        security = Security,
+        security_ptr = SecurityPtr,
+        instance_start_time = StartTime,
+        revs_limit = Header#db_header.revs_limit,
+        fsync_options = FsyncOptions,
+        options = Options,
+        compression = Compression,
+        before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
+        after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
+        }.
+
+open_reader_fd(Filepath, Options) ->
+    {ok, Fd} = case lists:member(sys_db, Options) of
+    true ->
+        couch_file:open(Filepath, [read_only, sys_db]);
+    false ->
+        couch_file:open(Filepath, [read_only])
+    end,
+    unlink(Fd),
+    Fd.
+
+close_db(#db{fd_ref_counter = RefCntr}) ->
+    couch_ref_counter:drop(RefCntr).
+
+
+refresh_validate_doc_funs(Db0) ->
+    Db = Db0#db{user_ctx = #user_ctx{roles=[<<"_admin">>]}},
+    DesignDocs = couch_db:get_design_docs(Db),
+    ProcessDocFuns = lists:flatmap(
+        fun(DesignDocInfo) ->
+            {ok, DesignDoc} = couch_db:open_doc_int(
+                Db, DesignDocInfo, [ejson_body]),
+            case couch_doc:get_validate_doc_fun(DesignDoc) of
+            nil -> [];
+            Fun -> [Fun]
+            end
+        end, DesignDocs),
+    Db0#db{validate_doc_funs=ProcessDocFuns}.
+
+% rev tree functions
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+    {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(#db{updater_fd = Fd} = Db,
+        [InfoUnflushed | RestUnflushed], AccFlushed) ->
+    #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
+    {Flushed, LeafsSize} = couch_key_tree:mapfold(
+        fun(_Rev, Value, Type, Acc) ->
+            case Value of
+            #doc{deleted = IsDeleted, body = {summary, Summary, AttsFd}} ->
+                % this node value is actually an unwritten document summary,
+                % write to disk.
+                % make sure the Fd in the written bins is the same Fd we are
+                % and convert bins, removing the FD.
+                % All bins should have been written to disk already.
+                case {AttsFd, Fd} of
+                {nil, _} ->
+                    ok;
+                {SameFd, SameFd} ->
+                    ok;
+                _ ->
+                    % Fd where the attachments were written to is not the same
+                    % as our Fd. This can happen when a database is being
+                    % switched out during a compaction.
+                    ?LOG_DEBUG("File where the attachments are written has"
+                            " changed. Possibly retrying.", []),
+                    throw(retry)
+                end,
+                {ok, NewSummaryPointer, SummarySize} =
+                    couch_file:append_raw_chunk(Fd, Summary),
+                TotalSize = lists:foldl(
+                    fun(#att{att_len = L}, A) -> A + L end,
+                    SummarySize, Value#doc.atts),
+                NewValue = {IsDeleted, NewSummaryPointer, UpdateSeq, TotalSize},
+                case Type of
+                leaf ->
+                    {NewValue, Acc + TotalSize};
+                branch ->
+                    {NewValue, Acc}
+                end;
+             {_, _, _, LeafSize} when Type =:= leaf, LeafSize =/= nil ->
+                {Value, Acc + LeafSize};
+             _ ->
+                {Value, Acc}
+            end
+        end, 0, Unflushed),
+    InfoFlushed = InfoUnflushed#full_doc_info{
+        rev_tree = Flushed,
+        leafs_size = LeafsSize
+    },
+    flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
+
+
+send_result(Client, Ref, NewResult) ->
+    % used to send a result to the client
+    catch(Client ! {result, self(), {Ref, NewResult}}).
+
+merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+    {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
+merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
+        [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+    #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted0,update_seq=OldSeq}
+            = OldDocInfo,
+    {NewRevTree, _} = lists:foldl(
+        fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, {AccTree, OldDeleted}) ->
+            if not MergeConflicts ->
+                case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
+                    Limit) of
+                {_NewTree, conflicts} when (not OldDeleted) ->
+                    send_result(Client, Ref, conflict),
+                    {AccTree, OldDeleted};
+                {NewTree, conflicts} when PrevRevs /= [] ->
+                    % Check to be sure if prev revision was specified, it's
+                    % a leaf node in the tree
+                    Leafs = couch_key_tree:get_all_leafs(AccTree),
+                    IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
+                            {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
+                        end, Leafs),
+                    if IsPrevLeaf ->
+                        {NewTree, OldDeleted};
+                    true ->
+                        send_result(Client, Ref, conflict),
+                        {AccTree, OldDeleted}
+                    end;
+                {NewTree, no_conflicts} when  AccTree == NewTree ->
+                    % the tree didn't change at all
+                    % meaning we are saving a rev that's already
+                    % been editted again.
+                    if (Pos == 1) and OldDeleted ->
+                        % this means we are recreating a brand new document
+                        % into a state that already existed before.
+                        % put the rev into a subsequent edit of the deletion
+                        #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
+                                couch_doc:to_doc_info(OldDocInfo),
+                        NewRevId = couch_db:new_revid(
+                                NewDoc#doc{revs={OldPos, [OldRev]}}),
+                        NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
+                        {NewTree2, _} = couch_key_tree:merge(AccTree,
+                                couch_doc:to_path(NewDoc2), Limit),
+                        % we changed the rev id, this tells the caller we did
+                        send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
+                        {NewTree2, OldDeleted};
+                    true ->
+                        send_result(Client, Ref, conflict),
+                        {AccTree, OldDeleted}
+                    end;
+                {NewTree, _} ->
+                    {NewTree, NewDoc#doc.deleted}
+                end;
+            true ->
+                {NewTree, _} = couch_key_tree:merge(AccTree,
+                            couch_doc:to_path(NewDoc), Limit),
+                {NewTree, OldDeleted}
+            end
+        end,
+        {OldTree, OldDeleted0}, NewDocs),
+    if NewRevTree == OldTree ->
+        % nothing changed
+        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+            AccNewInfos, AccRemoveSeqs, AccSeq);
+    true ->
+        % we have updated the document, give it a new seq #
+        NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
+        RemoveSeqs = case OldSeq of
+            0 -> AccRemoveSeqs;
+            _ -> [OldSeq | AccRemoveSeqs]
+        end,
+        merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+            [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
+    end.
+
+
+
+new_index_entries([], AccById, AccBySeq, AccDDocIds) ->
+    {AccById, AccBySeq, AccDDocIds};
+new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq, AccDDocIds) ->
+    #doc_info{revs=[#rev_info{deleted=Deleted}|_], id=Id} = DocInfo =
+            couch_doc:to_doc_info(FullDocInfo),
+    AccDDocIds2 = case Id of
+    <<?DESIGN_DOC_PREFIX, _/binary>> ->
+        [Id | AccDDocIds];
+    _ ->
+        AccDDocIds
+    end,
+    new_index_entries(RestInfos,
+        [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
+        [DocInfo|AccBySeq],
+        AccDDocIds2).
+
+
+stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
+    [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
+            #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+
+update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
+    #db{
+        fulldocinfo_by_id_btree = DocInfoByIdBTree,
+        docinfo_by_seq_btree = DocInfoBySeqBTree,
+        update_seq = LastSeq,
+        revs_limit = RevsLimit
+        } = Db,
+    Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
+    % lookup up the old documents, if they exist.
+    OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+    OldDocInfos = lists:zipwith(
+        fun(_Id, {ok, FullDocInfo}) ->
+            FullDocInfo;
+        (Id, not_found) ->
+            #full_doc_info{id=Id}
+        end,
+        Ids, OldDocLookups),
+    % Merge the new docs into the revision trees.
+    {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
+            MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
+
+    % All documents are now ready to write.
+
+    {ok, Db2}  = update_local_docs(Db, NonRepDocs),
+
+    % Write out the document summaries (the bodies are stored in the nodes of
+    % the trees, the attachments are already written to disk)
+    {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
+
+    {IndexFullDocInfos, IndexDocInfos, UpdatedDDocIds} =
+            new_index_entries(FlushedFullDocInfos, [], [], []),
+
+    % and the indexes
+    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
+    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
+
+    Db3 = Db2#db{
+        fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+        docinfo_by_seq_btree = DocInfoBySeqBTree2,
+        update_seq = NewSeq},
+
+    % Check if we just updated any design documents, and update the validation
+    % funs if we did.
+    Db4 = case UpdatedDDocIds of
+    [] ->
+        Db3;
+    _ ->
+        refresh_validate_doc_funs(Db3)
+    end,
+
+    {ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
+
+update_local_docs(Db, []) ->
+    {ok, Db};
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+    Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs],
+    OldDocLookups = couch_btree:lookup(Btree, Ids),
+    BtreeEntries = lists:zipwith(
+        fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, OldDocLookup) ->
+            case PrevRevs of
+            [RevStr|_] ->
+                PrevRev = list_to_integer(?b2l(RevStr));
+            [] ->
+                PrevRev = 0
+            end,
+            OldRev =
+            case OldDocLookup of
+                {ok, {_, {OldRev0, _}}} -> OldRev0;
+                not_found -> 0
+            end,
+            case OldRev == PrevRev of
+            true ->
+                case Delete of
+                    false ->
+                        send_result(Client, Ref, {ok,
+                                {0, ?l2b(integer_to_list(PrevRev + 1))}}),
+                        {update, {Id, {PrevRev + 1, Body}}};
+                    true  ->
+                        send_result(Client, Ref,
+                                {ok, {0, <<"0">>}}),
+                        {remove, Id}
+                end;
+            false ->
+                send_result(Client, Ref, conflict),
+                ignore
+            end
+        end, Docs, OldDocLookups),
+
+    BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+    BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
+
+    {ok, Btree2} =
+        couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+    {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+commit_data(Db) ->
+    commit_data(Db, false).
+
+db_to_header(Db, Header) ->
+    Header#db_header{
+        update_seq = Db#db.update_seq,
+        docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+        fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
+        local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+        security_ptr = Db#db.security_ptr,
+        revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
+    Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)};
+commit_data(Db, true) ->
+    Db;
+commit_data(Db, _) ->
+    #db{
+        updater_fd = Fd,
+        filepath = Filepath,
+        header = OldHeader,
+        fsync_options = FsyncOptions,
+        waiting_delayed_commit = Timer
+    } = Db,
+    if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
+    case db_to_header(Db, OldHeader) of
+    OldHeader ->
+        Db#db{waiting_delayed_commit=nil};
+    Header ->
+        case lists:member(before_header, FsyncOptions) of
+        true -> ok = couch_file:sync(Filepath);
+        _    -> ok
+        end,
+
+        ok = couch_file:write_header(Fd, Header),
+
+        case lists:member(after_header, FsyncOptions) of
+        true -> ok = couch_file:sync(Filepath);
+        _    -> ok
+        end,
+
+        Db#db{waiting_delayed_commit=nil,
+            header=Header,
+            committed_update_seq=Db#db.update_seq}
+    end.
+
+
+copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+    {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
+    BinInfos = case BinInfos0 of
+    _ when is_binary(BinInfos0) ->
+        couch_compress:decompress(BinInfos0);
+    _ when is_list(BinInfos0) ->
+        % pre 1.2 file format
+        BinInfos0
+    end,
+    % copy the bin values
+    NewBinInfos = lists:map(
+        fun({Name, Type, BinSp, AttLen, RevPos, Md5}) ->
+            % 010 UPGRADE CODE
+            {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
+                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+            {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity};
+        ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) ->
+            {NewBinSp, AttLen, _, Md5, _IdentityMd5} =
+                couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+            Enc = case Enc1 of
+            true ->
+                % 0110 UPGRADE CODE
+                gzip;
+            false ->
+                % 0110 UPGRADE CODE
+                identity;
+            _ ->
+                Enc1
+            end,
+            {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc}
+        end, BinInfos),
+    {BodyData, NewBinInfos}.
+
+copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
+    % COUCHDB-968, make sure we prune duplicates during compaction
+    InfoBySeq = lists:usort(fun(#doc_info{id=A}, #doc_info{id=B}) -> A =< B end,
+        InfoBySeq0),
+    Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+    LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+
+    NewFullDocInfos1 = lists:map(
+        fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+            Info#full_doc_info{rev_tree=couch_key_tree:map(
+                fun(_, _, branch) ->
+                    ?REV_MISSING;
+                (_Rev, LeafVal, leaf) ->
+                    IsDel = element(1, LeafVal),
+                    Sp = element(2, LeafVal),
+                    Seq = element(3, LeafVal),
+                    {_Body, AttsInfo} = Summary = copy_doc_attachments(
+                        Db, Sp, DestFd),
+                    SummaryChunk = make_doc_summary(NewDb, Summary),
+                    {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
+                        DestFd, SummaryChunk),
+                    TotalLeafSize = lists:foldl(
+                        fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end,
+                        SummarySize, AttsInfo),
+                    {IsDel, Pos, Seq, TotalLeafSize}
+                end, RevTree)}
+        end, LookupResults),
+
+    NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
+    NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
+    RemoveSeqs =
+    case Retry of
+    false ->
+        [];
+    true ->
+        % We are retrying a compaction, meaning the documents we are copying may
+        % already exist in our file and must be removed from the by_seq index.
+        Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids),
+        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+    end,
+
+    {ok, DocInfoBTree} = couch_btree:add_remove(
+            NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
+    {ok, FullDocInfoBTree} = couch_btree:add_remove(
+            NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+    update_compact_task(length(NewFullDocInfos)),
+    NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
+              docinfo_by_seq_btree=DocInfoBTree}.
+
+
+
+copy_compact(Db, NewDb0, Retry) ->
+    FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+    Compression = couch_compress:get_compression_method(),
+    NewDb = NewDb0#db{fsync_options=FsyncOptions, compression=Compression},
+    TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+    BufferSize = list_to_integer(
+        couch_config:get("database_compaction", "doc_buffer_size", "524288")),
+    CheckpointAfter = couch_util:to_integer(
+        couch_config:get("database_compaction", "checkpoint_after",
+            BufferSize * 10)),
+
+    EnumBySeqFun =
+    fun(#doc_info{high_seq=Seq}=DocInfo, _Offset,
+        {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
+
+        AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
+        if AccUncopiedSize2 >= BufferSize ->
+            NewDb2 = copy_docs(
+                Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+            AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
+            if AccCopiedSize2 >= CheckpointAfter ->
+                {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], 0, 0}};
+            true ->
+                {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
+            end;
+        true ->
+            {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
+                AccCopiedSize}}
+        end
+    end,
+
+    TaskProps0 = [
+        {type, database_compaction},
+        {database, Db#db.name},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, TotalChanges}
+    ],
+    case Retry and couch_task_status:is_task_added() of
+    true ->
+        couch_task_status:update([
+            {retry, true},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, TotalChanges}
+        ]);
+    false ->
+        couch_task_status:add_task(TaskProps0),
+        couch_task_status:set_update_frequency(500)
+    end,
+
+    {ok, _, {NewDb2, Uncopied, _, _}} =
+        couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
+            {NewDb, [], 0, 0},
+            [{start_key, NewDb#db.update_seq + 1}]),
+
+    NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+
+    % copy misc header values
+    if NewDb3#db.security /= Db#db.security ->
+        {ok, Ptr, _} = couch_file:append_term(
+            NewDb3#db.updater_fd, Db#db.security,
+            [{compression, NewDb3#db.compression}]),
+        NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
+    true ->
+        NewDb4 = NewDb3
+    end,
+
+    commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
+
+start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
+    CompactFile = Filepath ++ ".compact",
+    ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
+    case couch_file:open(CompactFile, [nologifmissing]) of
+    {ok, Fd} ->
+        Retry = true,
+        case couch_file:read_header(Fd) of
+        {ok, Header} ->
+            ok;
+        no_valid_header ->
+            ok = couch_file:write_header(Fd, Header=#db_header{})
+        end;
+    {error, enoent} ->
+        {ok, Fd} = couch_file:open(CompactFile, [create]),
+        Retry = false,
+        ok = couch_file:write_header(Fd, Header=#db_header{})
+    end,
+    ReaderFd = open_reader_fd(CompactFile, Db#db.options),
+    NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options),
+    NewDb2 = if PurgeSeq > 0 ->
+        {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+        {ok, Pointer, _} = couch_file:append_term(
+            Fd, PurgedIdsRevs, [{compression, NewDb#db.compression}]),
+        NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+    true ->
+        NewDb
+    end,
+    unlink(Fd),
+
+    NewDb3 = copy_compact(Db, NewDb2, Retry),
+    close_db(NewDb3),
+    case gen_server:call(
+        Db#db.update_pid, {compact_done, CompactFile}, infinity) of
+    ok ->
+        ok;
+    {retry, CurrentDb} ->
+        start_copy_compact(CurrentDb)
+    end.
+
+update_compact_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+    0 ->
+        0;
+    _ ->
+        (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+
+make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
+    Body = case couch_compress:is_compressed(Body0, Comp) of
+    true ->
+        Body0;
+    false ->
+        % pre 1.2 database file format
+        couch_compress:compress(Body0, Comp)
+    end,
+    Atts = case couch_compress:is_compressed(Atts0, Comp) of
+    true ->
+        Atts0;
+    false ->
+        couch_compress:compress(Atts0, Comp)
+    end,
+    SummaryBin = ?term_to_bin({Body, Atts}),
+    couch_file:assemble_file_chunk(SummaryBin, couch_util:md5(SummaryBin)).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couch_doc.erl b/src/couch_doc.erl
new file mode 100644
index 0000000..4047370
--- /dev/null
+++ b/src/couch_doc.erl
@@ -0,0 +1,650 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_doc).
+
+-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
+-export([att_foldl/3,range_att_foldl/5,att_foldl_decode/3,get_validate_doc_fun/1]).
+-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
+-export([validate_docid/1]).
+-export([doc_from_multi_part_stream/2]).
+-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
+-export([abort_multi_part_stream/1]).
+-export([to_path/1]).
+-export([mp_parse_doc/2]).
+-export([with_ejson_body/1]).
+
+-include("couch_db.hrl").
+
+-spec to_path(#doc{}) -> path().
+to_path(#doc{revs={Start, RevIds}}=Doc) ->
+    [Branch] = to_branch(Doc, lists:reverse(RevIds)),
+    {Start - length(RevIds) + 1, Branch}.
+
+-spec to_branch(#doc{}, [RevId::binary()]) -> [branch()].
+to_branch(Doc, [RevId]) ->
+    [{RevId, Doc, []}];
+to_branch(Doc, [RevId | Rest]) ->
+    [{RevId, ?REV_MISSING, to_branch(Doc, Rest)}].
+
+% helpers used by to_json_obj
+to_json_rev(0, []) ->
+    [];
+to_json_rev(Start, [FirstRevId|_]) ->
+    [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",revid_to_str(FirstRevId)])}].
+
+to_json_body(true, {Body}) ->
+    Body ++ [{<<"_deleted">>, true}];
+to_json_body(false, {Body}) ->
+    Body.
+
+to_json_revisions(Options, Start, RevIds) ->
+    case lists:member(revs, Options) of
+    false -> [];
+    true ->
+        [{<<"_revisions">>, {[{<<"start">>, Start},
+                {<<"ids">>, [revid_to_str(R) ||R <- RevIds]}]}}]
+    end.
+
+revid_to_str(RevId) when size(RevId) =:= 16 ->
+    ?l2b(couch_util:to_hex(RevId));
+revid_to_str(RevId) ->
+    RevId.
+
+rev_to_str({Pos, RevId}) ->
+    ?l2b([integer_to_list(Pos),"-",revid_to_str(RevId)]).
+
+
+revs_to_strs([]) ->
+    [];
+revs_to_strs([{Pos, RevId}| Rest]) ->
+    [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
+
+to_json_meta(Meta) ->
+    lists:map(
+        fun({revs_info, Start, RevsInfo}) ->
+            {JsonRevsInfo, _Pos}  = lists:mapfoldl(
+                fun({RevId, Status}, PosAcc) ->
+                    JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})},
+                        {<<"status">>, ?l2b(atom_to_list(Status))}]},
+                    {JsonObj, PosAcc - 1}
+                end, Start, RevsInfo),
+            {<<"_revs_info">>, JsonRevsInfo};
+        ({local_seq, Seq}) ->
+            {<<"_local_seq">>, Seq};
+        ({conflicts, Conflicts}) ->
+            {<<"_conflicts">>, revs_to_strs(Conflicts)};
+        ({deleted_conflicts, DConflicts}) ->
+            {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
+        end, Meta).
+
+to_json_attachments(Attachments, Options) ->
+    to_json_attachments(
+        Attachments,
+        lists:member(attachments, Options),
+        lists:member(follows, Options),
+        lists:member(att_encoding_info, Options)
+    ).
+
+to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) ->
+    [];
+to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) ->
+    AttProps = lists:map(
+        fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
+            {Att#att.name, {[
+                {<<"content_type">>, Att#att.type},
+                {<<"revpos">>, Att#att.revpos}] ++
+                case Att#att.md5 of
+                    <<>> ->
+                        [];
+                    Md5 ->
+                        EncodedMd5 = base64:encode(Md5),
+                        [{<<"digest">>, <<"md5-",EncodedMd5/binary>>}]
+                end ++
+                if not OutputData orelse Att#att.data == stub ->
+                    [{<<"length">>, DiskLen}, {<<"stub">>, true}];
+                true ->
+                    if DataToFollow ->
+                        [{<<"length">>, DiskLen}, {<<"follows">>, true}];
+                    true ->
+                        AttData = case Enc of
+                        gzip ->
+                            zlib:gunzip(att_to_bin(Att));
+                        identity ->
+                            att_to_bin(Att)
+                        end,
+                        [{<<"data">>, base64:encode(AttData)}]
+                    end
+                end ++
+                    case {ShowEncInfo, Enc} of
+                    {false, _} ->
+                        [];
+                    {true, identity} ->
+                        [];
+                    {true, _} ->
+                        [
+                            {<<"encoding">>, couch_util:to_binary(Enc)},
+                            {<<"encoded_length">>, AttLen}
+                        ]
+                    end
+            }}
+        end, Atts),
+    [{<<"_attachments">>, {AttProps}}].
+
+to_json_obj(Doc, Options) ->
+    doc_to_json_obj(with_ejson_body(Doc), Options).
+
+doc_to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
+            meta=Meta}=Doc,Options)->
+    {[{<<"_id">>, Id}]
+        ++ to_json_rev(Start, RevIds)
+        ++ to_json_body(Del, Body)
+        ++ to_json_revisions(Options, Start, RevIds)
+        ++ to_json_meta(Meta)
+        ++ to_json_attachments(Doc#doc.atts, Options)
+    }.
+
+from_json_obj({Props}) ->
+    transfer_fields(Props, #doc{body=[]});
+
+from_json_obj(_Other) ->
+    throw({bad_request, "Document must be a JSON object"}).
+
+parse_revid(RevId) when size(RevId) =:= 32 ->
+    RevInt = erlang:list_to_integer(?b2l(RevId), 16),
+     <<RevInt:128>>;
+parse_revid(RevId) when length(RevId) =:= 32 ->
+    RevInt = erlang:list_to_integer(RevId, 16),
+     <<RevInt:128>>;
+parse_revid(RevId) when is_binary(RevId) ->
+    RevId;
+parse_revid(RevId) when is_list(RevId) ->
+    ?l2b(RevId).
+
+
+parse_rev(Rev) when is_binary(Rev) ->
+    parse_rev(?b2l(Rev));
+parse_rev(Rev) when is_list(Rev) ->
+    SplitRev = lists:splitwith(fun($-) -> false; (_) -> true end, Rev),
+    case SplitRev of
+        {Pos, [$- | RevId]} -> {list_to_integer(Pos), parse_revid(RevId)};
+        _Else -> throw({bad_request, <<"Invalid rev format">>})
+    end;
+parse_rev(_BadRev) ->
+    throw({bad_request, <<"Invalid rev format">>}).
+
+parse_revs([]) ->
+    [];
+parse_revs([Rev | Rest]) ->
+    [parse_rev(Rev) | parse_revs(Rest)].
+
+
+validate_docid(<<"">>) ->
+    throw({bad_request, <<"Document id must not be empty">>});
+validate_docid(Id) when is_binary(Id) ->
+    case couch_util:validate_utf8(Id) of
+        false -> throw({bad_request, <<"Document id must be valid UTF-8">>});
+        true -> ok
+    end,
+    case Id of
+    <<"_design/", _/binary>> -> ok;
+    <<"_local/", _/binary>> -> ok;
+    <<"_", _/binary>> ->
+        throw({bad_request, <<"Only reserved document ids may start with underscore.">>});
+    _Else -> ok
+    end;
+validate_docid(Id) ->
+    ?LOG_DEBUG("Document id is not a string: ~p", [Id]),
+    throw({bad_request, <<"Document id must be a string">>}).
+
+transfer_fields([], #doc{body=Fields}=Doc) ->
+    % convert fields back to json object
+    Doc#doc{body={lists:reverse(Fields)}};
+
+transfer_fields([{<<"_id">>, Id} | Rest], Doc) ->
+    validate_docid(Id),
+    transfer_fields(Rest, Doc#doc{id=Id});
+
+transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) ->
+    {Pos, RevId} = parse_rev(Rev),
+    transfer_fields(Rest,
+            Doc#doc{revs={Pos, [RevId]}});
+
+transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) ->
+    % we already got the rev from the _revisions
+    transfer_fields(Rest,Doc);
+
+transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
+    Atts = lists:map(fun({Name, {BinProps}}) ->
+        Md5 = case couch_util:get_value(<<"digest">>, BinProps) of
+            <<"md5-",EncodedMd5/binary>> ->
+                base64:decode(EncodedMd5);
+            _ ->
+               <<>>
+        end,
+        case couch_util:get_value(<<"stub">>, BinProps) of
+        true ->
+            Type = couch_util:get_value(<<"content_type">>, BinProps),
+            RevPos = couch_util:get_value(<<"revpos">>, BinProps, nil),
+            DiskLen = couch_util:get_value(<<"length">>, BinProps),
+            {Enc, EncLen} = att_encoding_info(BinProps),
+            #att{name=Name, data=stub, type=Type, att_len=EncLen,
+                disk_len=DiskLen, encoding=Enc, revpos=RevPos, md5=Md5};
+        _ ->
+            Type = couch_util:get_value(<<"content_type">>, BinProps,
+                    ?DEFAULT_ATTACHMENT_CONTENT_TYPE),
+            RevPos = couch_util:get_value(<<"revpos">>, BinProps, 0),
+            case couch_util:get_value(<<"follows">>, BinProps) of
+            true ->
+                DiskLen = couch_util:get_value(<<"length">>, BinProps),
+                {Enc, EncLen} = att_encoding_info(BinProps),
+                #att{name=Name, data=follows, type=Type, encoding=Enc,
+                    att_len=EncLen, disk_len=DiskLen, revpos=RevPos, md5=Md5};
+            _ ->
+                Value = couch_util:get_value(<<"data">>, BinProps),
+                Bin = base64:decode(Value),
+                LenBin = size(Bin),
+                #att{name=Name, data=Bin, type=Type, att_len=LenBin,
+                        disk_len=LenBin, revpos=RevPos}
+            end
+        end
+    end, JsonBins),
+    transfer_fields(Rest, Doc#doc{atts=Atts});
+
+transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
+    RevIds = couch_util:get_value(<<"ids">>, Props),
+    Start = couch_util:get_value(<<"start">>, Props),
+    if not is_integer(Start) ->
+        throw({doc_validation, "_revisions.start isn't an integer."});
+    not is_list(RevIds) ->
+        throw({doc_validation, "_revisions.ids isn't a array."});
+    true ->
+        ok
+    end,
+    [throw({doc_validation, "RevId isn't a string"}) ||
+            RevId <- RevIds, not is_binary(RevId)],
+    RevIds2 = [parse_revid(RevId) || RevId <- RevIds],
+    transfer_fields(Rest, Doc#doc{revs={Start, RevIds2}});
+
+transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when is_boolean(B) ->
+    transfer_fields(Rest, Doc#doc{deleted=B});
+
+% ignored fields
+transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) ->
+    transfer_fields(Rest, Doc);
+transfer_fields([{<<"_local_seq">>, _} | Rest], Doc) ->
+    transfer_fields(Rest, Doc);
+transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) ->
+    transfer_fields(Rest, Doc);
+transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) ->
+    transfer_fields(Rest, Doc);
+
+% special fields for replication documents
+transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
+    #doc{body=Fields} = Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
+    #doc{body=Fields} = Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_state_reason">>, _} = Field | Rest],
+    #doc{body=Fields} = Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_id">>, _} = Field | Rest],
+    #doc{body=Fields} = Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+transfer_fields([{<<"_replication_stats">>, _} = Field | Rest],
+    #doc{body=Fields} = Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
+
+% unknown special field
+transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
+    throw({doc_validation,
+            ?l2b(io_lib:format("Bad special document member: _~s", [Name]))});
+
+transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
+    transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
+
+att_encoding_info(BinProps) ->
+    DiskLen = couch_util:get_value(<<"length">>, BinProps),
+    case couch_util:get_value(<<"encoding">>, BinProps) of
+    undefined ->
+        {identity, DiskLen};
+    Enc ->
+        EncodedLen = couch_util:get_value(<<"encoded_length">>, BinProps, DiskLen),
+        {list_to_existing_atom(?b2l(Enc)), EncodedLen}
+    end.
+
+to_doc_info(FullDocInfo) ->
+    {DocInfo, _Path} = to_doc_info_path(FullDocInfo),
+    DocInfo.
+
+max_seq(Tree, UpdateSeq) ->
+    FoldFun = fun({_Pos, _Key}, Value, _Type, MaxOldSeq) ->
+        case Value of
+            {_Deleted, _DiskPos, OldTreeSeq} ->
+                % Older versions didn't track data sizes.
+                erlang:max(MaxOldSeq, OldTreeSeq);
+            {_Deleted, _DiskPos, OldTreeSeq, _Size} ->
+                erlang:max(MaxOldSeq, OldTreeSeq);
+            _ ->
+                MaxOldSeq
+        end
+    end,
+    couch_key_tree:fold(FoldFun, UpdateSeq, Tree).
+
+to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree,update_seq=Seq}) ->
+    RevInfosAndPath = [
+        {#rev_info{
+            deleted = element(1, LeafVal),
+            body_sp = element(2, LeafVal),
+            seq = element(3, LeafVal),
+            rev = {Pos, RevId}
+        }, Path} || {LeafVal, {Pos, [RevId | _]} = Path} <-
+            couch_key_tree:get_all_leafs(Tree)
+    ],
+    SortedRevInfosAndPath = lists:sort(
+            fun({#rev_info{deleted=DeletedA,rev=RevA}, _PathA},
+                {#rev_info{deleted=DeletedB,rev=RevB}, _PathB}) ->
+            % sort descending by {not deleted, rev}
+            {not DeletedA, RevA} > {not DeletedB, RevB}
+        end, RevInfosAndPath),
+    [{_RevInfo, WinPath}|_] = SortedRevInfosAndPath,
+    RevInfos = [RevInfo || {RevInfo, _Path} <- SortedRevInfosAndPath],
+    {#doc_info{id=Id, high_seq=max_seq(Tree, Seq), revs=RevInfos}, WinPath}.
+
+
+
+
+att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) ->
+    Fun(Bin, Acc);
+att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
+    couch_stream:foldl(Fd, Sp, Md5, Fun, Acc);
+att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) ->
+   fold_streamed_data(DataFun, Len, Fun, Acc).
+
+range_att_foldl(#att{data={Fd,Sp}}, From, To, Fun, Acc) ->
+   couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc).
+
+att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
+    couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc);
+att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) ->
+       fold_streamed_data(Fun2, Len, Fun, Acc).
+
+att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
+    Bin;
+att_to_bin(#att{data=Iolist}) when is_list(Iolist) ->
+    iolist_to_binary(Iolist);
+att_to_bin(#att{data={_Fd,_Sp}}=Att) ->
+    iolist_to_binary(
+        lists:reverse(att_foldl(
+                Att,
+                fun(Bin,Acc) -> [Bin|Acc] end,
+                []
+        ))
+    );
+att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)->
+    iolist_to_binary(
+        lists:reverse(fold_streamed_data(
+            DataFun,
+            Len,
+            fun(Data, Acc) -> [Data | Acc] end,
+            []
+        ))
+    ).
+
+get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
+    case couch_util:get_value(<<"validate_doc_update">>, Props) of
+    undefined ->
+        nil;
+    _Else ->
+        fun(EditDoc, DiskDoc, Ctx, SecObj) ->
+            couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj)
+        end
+    end.
+
+
+has_stubs(#doc{atts=Atts}) ->
+    has_stubs(Atts);
+has_stubs([]) ->
+    false;
+has_stubs([#att{data=stub}|_]) ->
+    true;
+has_stubs([_Att|Rest]) ->
+    has_stubs(Rest).
+
+merge_stubs(#doc{id = Id}, nil) ->
+    throw({missing_stub, <<"Previous revision missing for document ", Id/binary>>});
+merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
+    BinDict = dict:from_list([{Name, Att} || #att{name=Name}=Att <- DiskBins]),
+    MergedBins = lists:map(
+        fun(#att{name=Name, data=stub, revpos=StubRevPos}) ->
+            case dict:find(Name, BinDict) of
+            {ok, #att{revpos=DiskRevPos}=DiskAtt}
+                    when DiskRevPos == StubRevPos orelse StubRevPos == nil ->
+                DiskAtt;
+            _ ->
+                throw({missing_stub,
+                        <<"id:", Id/binary, ", name:", Name/binary>>})
+            end;
+        (Att) ->
+            Att
+        end, MemBins),
+    StubsDoc#doc{atts= MergedBins}.
+
+fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
+    Acc;
+fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
+    Bin = RcvFun(),
+    ResultAcc = Fun(Bin, Acc),
+    fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
+
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) ->
+    AttsSize = lists:foldl(fun(Att, AccAttsSize) ->
+            #att{
+                data=Data,
+                name=Name,
+                att_len=AttLen,
+                disk_len=DiskLen,
+                type=Type,
+                encoding=Encoding
+            } = Att,
+            case Data of
+            stub ->
+                AccAttsSize;
+            _ ->
+                AccAttsSize +
+                4 + % "\r\n\r\n"
+                case SendEncodedAtts of
+                true ->
+                    % header
+                    length(integer_to_list(AttLen)) +
+                    AttLen;
+                _ ->
+                    % header
+                    length(integer_to_list(DiskLen)) +
+                    DiskLen
+                end +
+                4 + % "\r\n--"
+                size(Boundary) +
+
+                % attachment headers
+                % (the length of the Content-Length has already been set)
+                size(Name) +
+                size(Type) +
+                length("\r\nContent-Disposition: attachment; filename=\"\"") +
+                length("\r\nContent-Type: ") +
+                length("\r\nContent-Length: ") +
+                case Encoding of
+                identity ->
+                    0;
+                 _ ->
+                    length(atom_to_list(Encoding)) +
+                    length("\r\nContent-Encoding: ")
+                end
+            end
+        end, 0, Atts),
+    if AttsSize == 0 ->
+        {<<"application/json">>, iolist_size(JsonBytes)};
+    true ->
+        {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>,
+            2 + % "--"
+            size(Boundary) +
+            36 + % "\r\ncontent-type: application/json\r\n\r\n"
+            iolist_size(JsonBytes) +
+            4 + % "\r\n--"
+            size(Boundary) +
+            + AttsSize +
+            2 % "--"
+            }
+    end.
+
+doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
+    SendEncodedAtts) ->
+    case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of
+    true ->
+        WriteFun([<<"--", Boundary/binary,
+                "\r\nContent-Type: application/json\r\n\r\n">>,
+                JsonBytes, <<"\r\n--", Boundary/binary>>]),
+        atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts);
+    false ->
+        WriteFun(JsonBytes)
+    end.
+
+atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) ->
+    WriteFun(<<"--">>);
+atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun,
+        SendEncodedAtts) ->
+    atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts);
+atts_to_mp([Att | RestAtts], Boundary, WriteFun,
+        SendEncodedAtts)  ->
+    #att{
+        name=Name,
+        att_len=AttLen,
+        disk_len=DiskLen,
+        type=Type,
+        encoding=Encoding
+    } = Att,
+
+    % write headers
+    LengthBin = case SendEncodedAtts of
+    true -> list_to_binary(integer_to_list(AttLen));
+    false -> list_to_binary(integer_to_list(DiskLen))
+    end,
+    WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>),
+    WriteFun(<<"\r\nContent-Type: ", Type/binary>>),
+    WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>),
+    case Encoding of
+    identity ->
+        ok;
+    _ ->
+        EncodingBin = atom_to_binary(Encoding, latin1),
+        WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>)
+    end,
+
+    % write data
+    WriteFun(<<"\r\n\r\n">>),
+    AttFun = case SendEncodedAtts of
+    false ->
+        fun att_foldl_decode/3;
+    true ->
+        fun att_foldl/3
+    end,
+    AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok),
+    WriteFun(<<"\r\n--", Boundary/binary>>),
+    atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts).
+
+
+doc_from_multi_part_stream(ContentType, DataFun) ->
+    Parent = self(),
+    Parser = spawn_link(fun() ->
+        {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
+            ContentType, DataFun,
+            fun(Next) -> mp_parse_doc(Next, []) end),
+        unlink(Parent),
+        Parent ! {self(), finished}
+        end),
+    Ref = make_ref(),
+    Parser ! {get_doc_bytes, Ref, self()},
+    receive
+    {doc_bytes, Ref, DocBytes} ->
+        Doc = from_json_obj(?JSON_DECODE(DocBytes)),
+        % go through the attachments looking for 'follows' in the data,
+        % replace with function that reads the data from MIME stream.
+        ReadAttachmentDataFun = fun() ->
+            Parser ! {get_bytes, Ref, self()},
+            receive {bytes, Ref, Bytes} -> Bytes end
+        end,
+        Atts2 = lists:map(
+            fun(#att{data=follows}=A) ->
+                A#att{data=ReadAttachmentDataFun};
+            (A) ->
+                A
+            end, Doc#doc.atts),
+        WaitFun = fun() ->
+            receive {Parser, finished} -> ok end,
+            erlang:put(mochiweb_request_recv, true)
+        end,
+        {ok, Doc#doc{atts=Atts2}, WaitFun, Parser}
+    end.
+
+mp_parse_doc({headers, H}, []) ->
+    case couch_util:get_value("content-type", H) of
+    {"application/json", _} ->
+        fun (Next) ->
+            mp_parse_doc(Next, [])
+        end
+    end;
+mp_parse_doc({body, Bytes}, AccBytes) ->
+    fun (Next) ->
+        mp_parse_doc(Next, [Bytes | AccBytes])
+    end;
+mp_parse_doc(body_end, AccBytes) ->
+    receive {get_doc_bytes, Ref, From} ->
+        From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
+    end,
+    fun mp_parse_atts/1.
+
+mp_parse_atts(eof) ->
+    ok;
+mp_parse_atts({headers, _H}) ->
+    fun mp_parse_atts/1;
+mp_parse_atts({body, Bytes}) ->
+    receive {get_bytes, Ref, From} ->
+        From ! {bytes, Ref, Bytes}
+    end,
+    fun mp_parse_atts/1;
+mp_parse_atts(body_end) ->
+    fun mp_parse_atts/1.
+
+
+abort_multi_part_stream(Parser) ->
+    abort_multi_part_stream(Parser, erlang:monitor(process, Parser)).
+
+abort_multi_part_stream(Parser, MonRef) ->
+    case is_process_alive(Parser) of
+    true ->
+        Parser ! {get_bytes, nil, self()},
+        receive
+        {bytes, nil, _Bytes} ->
+             abort_multi_part_stream(Parser, MonRef);
+        {'DOWN', MonRef, _, _, _} ->
+             ok
+        end;
+    false ->
+        erlang:demonitor(MonRef, [flush])
+    end.
+
+
+with_ejson_body(#doc{body = Body} = Doc) when is_binary(Body) ->
+    Doc#doc{body = couch_compress:decompress(Body)};
+with_ejson_body(#doc{body = {_}} = Doc) ->
+    Doc.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_ejson_compare.erl
----------------------------------------------------------------------
diff --git a/src/couch_ejson_compare.erl b/src/couch_ejson_compare.erl
new file mode 100644
index 0000000..f46ec35
--- /dev/null
+++ b/src/couch_ejson_compare.erl
@@ -0,0 +1,81 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ejson_compare).
+
+-export([less/2, less_json_ids/2, less_json/2]).
+
+less_json_ids({JsonA, IdA}, {JsonB, IdB}) ->
+    case less(JsonA, JsonB) of
+    0 ->
+        IdA < IdB;
+    Result ->
+        Result < 0
+    end.
+
+less_json(A,B) ->
+    less(A, B) < 0.
+
+less(A,A)                                 -> 0;
+
+less(A,B) when is_atom(A), is_atom(B)     -> atom_sort(A) - atom_sort(B);
+less(A,_) when is_atom(A)                 -> -1;
+less(_,B) when is_atom(B)                 -> 1;
+
+less(A,B) when is_number(A), is_number(B) -> A - B;
+less(A,_) when is_number(A)               -> -1;
+less(_,B) when is_number(B)               -> 1;
+
+less(A,B) when is_binary(A), is_binary(B) -> couch_collate:collate(A,B);
+less(A,_) when is_binary(A)               -> -1;
+less(_,B) when is_binary(B)               -> 1;
+
+less(A,B) when is_list(A), is_list(B)     -> less_list(A,B);
+less(A,_) when is_list(A)                 -> -1;
+less(_,B) when is_list(B)                 -> 1;
+
+less({A},{B}) when is_list(A), is_list(B) -> less_props(A,B);
+less({A},_) when is_list(A)               -> -1;
+less(_,{B}) when is_list(B)               -> 1.
+
+atom_sort(null) -> 1;
+atom_sort(false) -> 2;
+atom_sort(true) -> 3.
+
+less_props([], [_|_]) ->
+    -1;
+less_props(_, []) ->
+    1;
+less_props([{AKey, AValue}|RestA], [{BKey, BValue}|RestB]) ->
+    case couch_collate:collate(AKey, BKey) of
+    0 ->
+        case less(AValue, BValue) of
+        0 ->
+            less_props(RestA, RestB);
+        Result ->
+            Result
+        end;
+    Result ->
+        Result
+    end.
+
+less_list([], [_|_]) ->
+    -1;
+less_list(_, []) ->
+    1;
+less_list([A|RestA], [B|RestB]) ->
+    case less(A,B) of
+    0 ->
+        less_list(RestA, RestB);
+    Result ->
+        Result
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_event_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_sup.erl b/src/couch_event_sup.erl
new file mode 100644
index 0000000..07c4879
--- /dev/null
+++ b/src/couch_event_sup.erl
@@ -0,0 +1,73 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% The purpose of this module is to allow event handlers to particpate in Erlang
+%% supervisor trees. It provide a monitorable process that crashes if the event
+%% handler fails. The process, when shutdown, deregisters the event handler.
+
+-module(couch_event_sup).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+-export([start_link/3,start_link/4, stop/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
+
+%
+% Instead calling the
+% ok = gen_event:add_sup_handler(error_logger, my_log, Args)
+%
+% do this:
+% {ok, LinkedPid} = couch_event_sup:start_link(error_logger, my_log, Args)
+%
+% The benefit is the event is now part of the process tree, and can be
+% started, restarted and shutdown consistently like the rest of the server
+% components.
+%
+% And now if the "event" crashes, the supervisor is notified and can restart
+% the event handler.
+%
+% Use this form to named process:
+% {ok, LinkedPid} = couch_event_sup:start_link({local, my_log}, error_logger, my_log, Args)
+%
+
+start_link(EventMgr, EventHandler, Args) ->
+    gen_server:start_link(couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+start_link(ServerName, EventMgr, EventHandler, Args) ->
+    gen_server:start_link(ServerName, couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+init({EventMgr, EventHandler, Args}) ->
+    case gen_event:add_sup_handler(EventMgr, EventHandler, Args) of
+    ok ->
+        {ok, {EventMgr, EventHandler}};
+    {stop, Error} ->
+        {stop, Error}
+    end.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_call(_Whatever, _From, State) ->
+    {ok, State}.
+
+handle_cast(stop, State) ->
+    {stop, normal, State}.
+
+handle_info({gen_event_EXIT, _Handler, Reason}, State) ->
+    {stop, Reason, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_external_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_external_manager.erl b/src/couch_external_manager.erl
new file mode 100644
index 0000000..0c66ef8
--- /dev/null
+++ b/src/couch_external_manager.erl
@@ -0,0 +1,101 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_external_manager).
+-behaviour(gen_server).
+
+-export([start_link/0, execute/2, config_change/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+    gen_server:start_link({local, couch_external_manager},
+        couch_external_manager, [], []).
+
+execute(UrlName, JsonReq) ->
+    Pid = gen_server:call(couch_external_manager, {get, UrlName}),
+    case Pid of
+    {error, Reason} ->
+        Reason;
+    _ ->
+        couch_external_server:execute(Pid, JsonReq)
+    end.
+
+config_change("external", UrlName) ->
+    gen_server:call(couch_external_manager, {config, UrlName}).
+
+% gen_server API
+
+init([]) ->
+    process_flag(trap_exit, true),
+    Handlers = ets:new(couch_external_manager_handlers, [set, private]),
+    couch_config:register(fun ?MODULE:config_change/2),
+    {ok, Handlers}.
+
+terminate(_Reason, Handlers) ->
+    ets:foldl(fun({_UrlName, Pid}, nil) ->
+        couch_external_server:stop(Pid),
+        nil
+    end, nil, Handlers),
+    ok.
+
+handle_call({get, UrlName}, _From, Handlers) ->
+    case ets:lookup(Handlers, UrlName) of
+    [] ->
+        case couch_config:get("external", UrlName, nil) of
+        nil ->
+            Msg = lists:flatten(
+                io_lib:format("No server configured for ~p.", [UrlName])),
+            {reply, {error, {unknown_external_server, ?l2b(Msg)}}, Handlers};
+        Command ->
+            {ok, NewPid} = couch_external_server:start_link(UrlName, Command),
+            true = ets:insert(Handlers, {UrlName, NewPid}),
+            {reply, NewPid, Handlers}
+        end;
+    [{UrlName, Pid}] ->
+        {reply, Pid, Handlers}
+    end;
+handle_call({config, UrlName}, _From, Handlers) ->
+    % A newly added handler and a handler that had it's command
+    % changed are treated exactly the same.
+
+    % Shutdown the old handler.
+    case ets:lookup(Handlers, UrlName) of
+    [{UrlName, Pid}] ->
+        couch_external_server:stop(Pid);
+    [] ->
+        ok
+    end,
+    % Wait for next request to boot the handler.
+    {reply, ok, Handlers}.
+
+handle_cast(_Whatever, State) ->
+    {noreply, State}.
+
+handle_info({'EXIT', Pid, normal}, Handlers) ->
+    ?LOG_INFO("EXTERNAL: Server ~p terminated normally", [Pid]),
+    % The process terminated normally without us asking - Remove Pid from the
+    % handlers table so we don't attempt to reuse it
+    ets:match_delete(Handlers, {'_', Pid}),
+    {noreply, Handlers};
+
+handle_info({'EXIT', Pid, Reason}, Handlers) ->
+    ?LOG_INFO("EXTERNAL: Server ~p died. (reason: ~p)", [Pid, Reason]),
+    % Remove Pid from the handlers table so we don't try closing
+    % it a second time in terminate/2.
+    ets:match_delete(Handlers, {'_', Pid}),
+    {stop, normal, Handlers}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_external_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_external_server.erl b/src/couch_external_server.erl
new file mode 100644
index 0000000..b52c7ff
--- /dev/null
+++ b/src/couch_external_server.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_external_server).
+-behaviour(gen_server).
+
+-export([start_link/2, stop/1, execute/2]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+% External API
+
+start_link(Name, Command) ->
+    gen_server:start_link(couch_external_server, [Name, Command], []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+execute(Pid, JsonReq) ->
+    {json, Json} = gen_server:call(Pid, {execute, JsonReq}, infinity),
+    ?JSON_DECODE(Json).
+
+% Gen Server Handlers
+
+init([Name, Command]) ->
+    ?LOG_INFO("EXTERNAL: Starting process for: ~s", [Name]),
+    ?LOG_INFO("COMMAND: ~s", [Command]),
+    process_flag(trap_exit, true),
+    Timeout = list_to_integer(couch_config:get("couchdb", "os_process_timeout",
+        "5000")),
+    {ok, Pid} = couch_os_process:start_link(Command, [{timeout, Timeout}]),
+    couch_config:register(fun("couchdb", "os_process_timeout", NewTimeout) ->
+        couch_os_process:set_timeout(Pid, list_to_integer(NewTimeout))
+    end),
+    {ok, {Name, Command, Pid}}.
+
+terminate(_Reason, {_Name, _Command, Pid}) ->
+    couch_os_process:stop(Pid),
+    ok.
+
+handle_call({execute, JsonReq}, _From, {Name, Command, Pid}) ->
+    {reply, couch_os_process:prompt(Pid, JsonReq), {Name, Command, Pid}}.
+
+handle_info({'EXIT', _Pid, normal}, State) ->
+    {noreply, State};
+handle_info({'EXIT', Pid, Reason}, {Name, Command, Pid}) ->
+    ?LOG_INFO("EXTERNAL: Process for ~s exiting. (reason: ~w)", [Name, Reason]),
+    {stop, Reason, {Name, Command, Pid}}.
+
+handle_cast(stop, {Name, Command, Pid}) ->
+    ?LOG_INFO("EXTERNAL: Shutting down ~s", [Name]),
+    exit(Pid, normal),
+    {stop, normal, {Name, Command, Pid}};
+handle_cast(_Whatever, State) ->
+    {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+