You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/13 15:10:59 UTC

[couchdb] 02/05: Add legacy storage engine implementation

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3287-pluggable-storage-engines
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d7e3278d10b57835a2aa59ebbe3a0620ec74f35f
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Feb 5 11:51:31 2016 -0600

    Add legacy storage engine implementation
    
    This is the legacy storage engine code. I've kept it as part of the core
    couch application because we'll always need to have at least one
    storage engine available.
    
    COUCHDB-3287
---
 src/couch/src/couch_bt_engine.erl           | 943 ++++++++++++++++++++++++++++
 src/couch/src/couch_bt_engine.hrl           |  24 +
 src/couch/src/couch_bt_engine_compactor.erl | 497 +++++++++++++++
 src/couch/src/couch_bt_engine_header.erl    | 434 +++++++++++++
 src/couch/src/couch_bt_engine_stream.erl    |  70 +++
 5 files changed, 1968 insertions(+)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
new file mode 100644
index 0000000..c28f0d1
--- /dev/null
+++ b/src/couch/src/couch_bt_engine.erl
@@ -0,0 +1,943 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine).
+-behavior(couch_db_engine).
+
+-export([
+    exists/1,
+
+    delete/3,
+    delete_compaction_files/3,
+
+    init/2,
+    terminate/2,
+    handle_db_updater_call/2,
+    handle_db_updater_info/2,
+
+    incref/1,
+    decref/1,
+    monitored_by/1,
+
+    get_compacted_seq/1,
+    get_del_doc_count/1,
+    get_disk_version/1,
+    get_doc_count/1,
+    get_epochs/1,
+    get_last_purged/1,
+    get_purge_seq/1,
+    get_revs_limit/1,
+    get_security/1,
+    get_size_info/1,
+    get_update_seq/1,
+    get_uuid/1,
+
+    set_revs_limit/2,
+    set_security/2,
+
+    open_docs/2,
+    open_local_docs/2,
+    read_doc_body/2,
+
+    serialize_doc/2,
+    write_doc_body/2,
+    write_doc_infos/4,
+
+    commit_data/1,
+
+    open_write_stream/2,
+    open_read_stream/2,
+    is_active_stream/2,
+
+    fold_docs/4,
+    fold_local_docs/4,
+    fold_changes/5,
+    count_changes_since/2,
+
+    start_compaction/4,
+    finish_compaction/4
+]).
+
+
+-export([
+    init_state/4
+]).
+
+
+-export([
+    id_tree_split/1,
+    id_tree_join/2,
+    id_tree_reduce/2,
+
+    seq_tree_split/1,
+    seq_tree_join/2,
+    seq_tree_reduce/2,
+
+    local_tree_split/1,
+    local_tree_join/2
+]).
+
+
+% Used by the compactor
+-export([
+    set_update_seq/2,
+    copy_security/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_bt_engine.hrl").
+
+
+exists(FilePath) ->
+    case filelib:is_file(FilePath) of
+        true ->
+            true;
+        false ->
+            filelib:is_file(FilePath ++ ".compact")
+    end.
+
+
+delete(RootDir, FilePath, Async) ->
+    %% Delete any leftover compaction files. If we don't do this a
+    %% subsequent request for this DB will try to open them to use
+    %% as a recovery.
+    delete_compaction_files(RootDir, FilePath, [{context, delete}]),
+
+    % Delete the actual database file
+    couch_file:delete(RootDir, FilePath, Async).
+
+
+delete_compaction_files(RootDir, FilePath, DelOpts) ->
+    lists:foreach(fun(Ext) ->
+        couch_file:delete(RootDir, FilePath ++ Ext, DelOpts)
+    end, [".compact", ".compact.data", ".compact.meta"]).
+
+
+init(FilePath, Options) ->
+    {ok, Fd} = open_db_file(FilePath, Options),
+    Header = case lists:member(create, Options) of
+        true ->
+            delete_compaction_files(FilePath),
+            Header0 = couch_bt_engine_header:new(),
+            ok = couch_file:write_header(Fd, Header0),
+            Header0;
+        false ->
+            case couch_file:read_header(Fd) of
+                {ok, Header0} ->
+                    Header0;
+                no_valid_header ->
+                    delete_compaction_files(FilePath),
+                    Header0 =  couch_bt_engine_header:new(),
+                    ok = couch_file:write_header(Fd, Header0),
+                    Header0
+            end
+    end,
+    {ok, init_state(FilePath, Fd, Header, Options)}.
+
+
+terminate(_Reason, St) ->
+    % If the reason we died is because our fd disappeared
+    % then we don't need to try closing it again.
+    Ref = St#st.fd_monitor,
+    if Ref == closed -> ok; true ->
+        ok = couch_file:close(St#st.fd),
+        receive
+            {'DOWN', Ref, _,  _, _} ->
+                ok
+            after 500 ->
+                ok
+        end
+    end,
+    couch_util:shutdown_sync(St#st.fd),
+    ok.
+
+
+handle_db_updater_call(Msg, St) ->
+    {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.
+
+
+handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) ->
+    {stop, normal, St#st{fd=undefined, fd_monitor=closed}}.
+
+
+incref(St) ->
+    {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+
+
+decref(St) ->
+    true = erlang:demonitor(St#st.fd_monitor, [flush]),
+    ok.
+
+
+monitored_by(St) ->
+    case erlang:process_info(St#st.fd, monitored_by) of
+        {monitored_by, Pids} ->
+            Pids;
+        _ ->
+            []
+    end.
+
+
+get_compacted_seq(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, compacted_seq).
+
+
+get_del_doc_count(#st{} = St) ->
+    {ok, Reds} = couch_btree:full_reduce(St#st.id_tree),
+    element(2, Reds).
+
+
+get_disk_version(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, disk_version).
+
+
+get_doc_count(#st{} = St) ->
+    {ok, Reds} = couch_btree:full_reduce(St#st.id_tree),
+    element(1, Reds).
+
+
+get_epochs(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, epochs).
+
+
+get_last_purged(#st{header = Header} = St) ->
+    case couch_bt_engine_header:get(Header, purged_docs) of
+        nil ->
+            [];
+        Pointer ->
+            {ok, PurgeInfo} = couch_file:pread_term(St#st.fd, Pointer),
+            PurgeInfo
+    end.
+
+
+get_purge_seq(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, purge_seq).
+
+
+get_revs_limit(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, revs_limit).
+
+
+get_size_info(#st{} = St) ->
+    {ok, FileSize} = couch_file:bytes(St#st.fd),
+    {ok, DbReduction} = couch_btree:full_reduce(St#st.id_tree),
+    SizeInfo0 = element(3, DbReduction),
+    SizeInfo = case SizeInfo0 of
+        SI when is_record(SI, size_info) ->
+            SI;
+        {AS, ES} ->
+            #size_info{active=AS, external=ES};
+        AS ->
+            #size_info{active=AS}
+    end,
+    ActiveSize = active_size(St, SizeInfo),
+    ExternalSize = SizeInfo#size_info.external,
+    [
+        {active, ActiveSize},
+        {external, ExternalSize},
+        {file, FileSize}
+    ].
+
+
+get_security(#st{header = Header} = St) ->
+    case couch_bt_engine_header:get(Header, security_ptr) of
+        undefined ->
+            [];
+        Pointer ->
+            {ok, SecProps} = couch_file:pread_term(St#st.fd, Pointer),
+            SecProps
+    end.
+
+
+get_update_seq(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, update_seq).
+
+
+get_uuid(#st{header = Header}) ->
+    couch_bt_engine_header:get(Header, uuid).
+
+
+set_revs_limit(#st{header = Header} = St, RevsLimit) ->
+    NewSt = St#st{
+        header = couch_bt_engine_header:set(Header, [
+            {revs_limit, RevsLimit}
+        ]),
+        needs_commit = true
+    },
+    {ok, increment_update_seq(NewSt)}.
+
+
+set_security(#st{header = Header} = St, NewSecurity) ->
+    Options = [{compression, St#st.compression}],
+    {ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options),
+    NewSt = St#st{
+        header = couch_bt_engine_header:set(Header, [
+            {security_ptr, Ptr}
+        ]),
+        needs_commit = true
+    },
+    {ok, increment_update_seq(NewSt)}.
+
+
+open_docs(#st{} = St, DocIds) ->
+    Results = couch_btree:lookup(St#st.id_tree, DocIds),
+    lists:map(fun
+        ({ok, FDI}) -> FDI;
+        (not_found) -> not_found
+    end, Results).
+
+
+open_local_docs(#st{} = St, DocIds) ->
+    Results = couch_btree:lookup(St#st.local_tree, DocIds),
+    lists:map(fun
+        ({ok, Doc}) -> Doc;
+        (not_found) -> not_found
+    end, Results).
+
+
+read_doc_body(#st{} = St, #doc{} = Doc) ->
+    {ok, {Body, Atts}} = couch_file:pread_term(St#st.fd, Doc#doc.body),
+    Doc#doc{
+        body = Body,
+        atts = Atts
+    }.
+
+
+serialize_doc(#st{} = St, #doc{} = Doc) ->
+    Compress = fun(Term) ->
+        case couch_compress:is_compressed(Term, St#st.compression) of
+            true -> Term;
+            false -> couch_compress:compress(Term, St#st.compression)
+        end
+    end,
+    Body = Compress(Doc#doc.body),
+    Atts = Compress(Doc#doc.atts),
+    SummaryBin = ?term_to_bin({Body, Atts}),
+    Md5 = crypto:hash(md5, SummaryBin),
+    Data = couch_file:assemble_file_chunk(SummaryBin, Md5),
+    Doc#doc{body = Data}.
+
+
+write_doc_body(St, #doc{} = Doc) ->
+    #st{
+        fd = Fd
+    } = St,
+    {ok, Ptr, Written} = couch_file:append_raw_chunk(Fd, Doc#doc.body),
+    {ok, Doc#doc{body = Ptr}, Written}.
+
+
+write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
+    #st{
+        id_tree = IdTree,
+        seq_tree = SeqTree,
+        local_tree = LocalTree
+    } = St,
+    FinalAcc = lists:foldl(fun({OldFDI, NewFDI}, Acc) ->
+        {AddAcc, RemIdsAcc, RemSeqsAcc} = Acc,
+        case {OldFDI, NewFDI} of
+            {not_found, #full_doc_info{}} ->
+                {[NewFDI | AddAcc], RemIdsAcc, RemSeqsAcc};
+            {#full_doc_info{id = Id}, #full_doc_info{id = Id}} ->
+                NewAddAcc = [NewFDI | AddAcc],
+                NewRemSeqsAcc = [OldFDI#full_doc_info.update_seq | RemSeqsAcc],
+                {NewAddAcc, RemIdsAcc, NewRemSeqsAcc};
+            {#full_doc_info{id = Id}, not_found} ->
+                NewRemIdsAcc = [Id | RemIdsAcc],
+                NewRemSeqsAcc = [OldFDI#full_doc_info.update_seq | RemSeqsAcc],
+                {AddAcc, NewRemIdsAcc, NewRemSeqsAcc}
+        end
+    end, {[], [], []}, Pairs),
+
+    {Add, RemIds, RemSeqs} = FinalAcc,
+    {ok, IdTree2} = couch_btree:add_remove(IdTree, Add, RemIds),
+    {ok, SeqTree2} = couch_btree:add_remove(SeqTree, Add, RemSeqs),
+
+    {AddLDocs, RemLDocIds} = lists:foldl(fun(Doc, {AddAcc, RemAcc}) ->
+        case Doc#doc.deleted of
+            true ->
+                {AddAcc, [Doc#doc.id | RemAcc]};
+            false ->
+                {[Doc | AddAcc], RemAcc}
+        end
+    end, {[], []}, LocalDocs),
+    {ok, LocalTree2} = couch_btree:add_remove(LocalTree, AddLDocs, RemLDocIds),
+
+    NewUpdateSeq = lists:foldl(fun(#full_doc_info{update_seq=Seq}, Acc) ->
+        erlang:max(Seq, Acc)
+    end, get_update_seq(St), Add),
+
+    NewHeader = case PurgedIdRevs of
+        [] ->
+            couch_bt_engine_header:set(St#st.header, [
+                {update_seq, NewUpdateSeq}
+            ]);
+        _ ->
+            {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs),
+            OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
+            % We bump NewUpdateSeq because we have to ensure that
+            % indexers see that they need to process the new purge
+            % information.
+            couch_bt_engine_header:set(St#st.header, [
+                {update_seq, NewUpdateSeq + 1},
+                {purge_seq, OldPurgeSeq + 1},
+                {purged_docs, Ptr}
+            ])
+    end,
+
+    {ok, St#st{
+        header = NewHeader,
+        id_tree = IdTree2,
+        seq_tree = SeqTree2,
+        local_tree = LocalTree2,
+        needs_commit = true
+    }}.
+
+
+commit_data(St) ->
+    #st{
+        fd = Fd,
+        fsync_options = FsyncOptions,
+        header = OldHeader,
+        needs_commit = NeedsCommit
+    } = St,
+
+    NewHeader = update_header(St, OldHeader),
+
+    case NewHeader /= OldHeader orelse NeedsCommit of
+        true ->
+            Before = lists:member(before_header, FsyncOptions),
+            After = lists:member(after_header, FsyncOptions),
+
+            if Before -> couch_file:sync(Fd); true -> ok end,
+            ok = couch_file:write_header(Fd, NewHeader),
+            if After -> couch_file:sync(Fd); true -> ok end,
+
+            {ok, St#st{
+                header = NewHeader,
+                needs_commit = false
+            }};
+        false ->
+            {ok, St}
+    end.
+
+
+open_write_stream(#st{} = St, Options) ->
+    couch_stream:open({couch_bt_engine_stream, {St#st.fd, []}}, Options).
+
+
+open_read_stream(#st{} = St, StreamSt) ->
+    {ok, {couch_bt_engine_stream, {St#st.fd, StreamSt}}}.
+
+
+is_active_stream(#st{} = St, {couch_bt_engine_stream, {Fd, _}}) ->
+    St#st.fd == Fd;
+is_active_stream(_, _) ->
+    false.
+
+
+fold_docs(St, UserFun, UserAcc, Options) ->
+    fold_docs_int(St#st.id_tree, UserFun, UserAcc, Options).
+
+
+fold_local_docs(St, UserFun, UserAcc, Options) ->
+    fold_docs_int(St#st.local_tree, UserFun, UserAcc, Options).
+
+
+fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
+    Fun = fun drop_reductions/4,
+    InAcc = {UserFun, UserAcc},
+    Opts = [{start_key, SinceSeq + 1}] ++ Options,
+    {ok, _, OutAcc} = couch_btree:fold(St#st.seq_tree, Fun, InAcc, Opts),
+    {_, FinalUserAcc} = OutAcc,
+    {ok, FinalUserAcc}.
+
+
+count_changes_since(St, SinceSeq) ->
+    BTree = St#st.seq_tree,
+    FoldFun = fun(_SeqStart, PartialReds, 0) ->
+        {ok, couch_btree:final_reduce(BTree, PartialReds)}
+    end,
+    Opts = [{start_key, SinceSeq + 1}],
+    {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
+    Changes.
+
+
+start_compaction(St, DbName, Options, Parent) ->
+    Args = [St, DbName, Options, Parent],
+    Pid = spawn_link(couch_bt_engine_compactor, start, Args),
+    {ok, St, Pid}.
+
+
+finish_compaction(OldState, DbName, Options, CompactFilePath) ->
+    {ok, NewState1} = ?MODULE:init(CompactFilePath, Options),
+    OldSeq = get_update_seq(OldState),
+    NewSeq = get_update_seq(NewState1),
+    case OldSeq == NewSeq of
+        true ->
+            finish_compaction_int(OldState, NewState1);
+        false ->
+            couch_log:info("Compaction file still behind main file "
+                           "(update seq=~p. compact update seq=~p). Retrying.",
+                           [OldSeq, NewSeq]),
+            ok = decref(NewState1),
+            start_compaction(OldState, DbName, Options, self())
+    end.
+
+
+id_tree_split(#full_doc_info{}=Info) ->
+    #full_doc_info{
+        id = Id,
+        update_seq = Seq,
+        deleted = Deleted,
+        sizes = SizeInfo,
+        rev_tree = Tree
+    } = Info,
+    {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}.
+
+
+id_tree_join(Id, {HighSeq, Deleted, DiskTree}) ->
+    % Handle old formats before data_size was added
+    id_tree_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree});
+
+id_tree_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) ->
+    #full_doc_info{
+        id = Id,
+        update_seq = HighSeq,
+        deleted = ?i2b(Deleted),
+        sizes = couch_db_updater:upgrade_sizes(Sizes),
+        rev_tree = rev_tree(DiskTree)
+    }.
+
+
+id_tree_reduce(reduce, FullDocInfos) ->
+    lists:foldl(fun(Info, {NotDeleted, Deleted, Sizes}) ->
+        Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes),
+        case Info#full_doc_info.deleted of
+        true ->
+            {NotDeleted, Deleted + 1, Sizes2};
+        false ->
+            {NotDeleted + 1, Deleted, Sizes2}
+        end
+    end, {0, 0, #size_info{}}, FullDocInfos);
+id_tree_reduce(rereduce, Reds) ->
+    lists:foldl(fun
+        ({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) ->
+            % pre 1.2 format, will be upgraded on compaction
+            {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
+        ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) ->
+            AccSizes2 = reduce_sizes(AccSizes, Sizes),
+            {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2}
+    end, {0, 0, #size_info{}}, Reds).
+
+
+seq_tree_split(#full_doc_info{}=Info) ->
+    #full_doc_info{
+        id = Id,
+        update_seq = Seq,
+        deleted = Del,
+        sizes = SizeInfo,
+        rev_tree = Tree
+    } = Info,
+    {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}.
+
+
+seq_tree_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
+    seq_tree_join(Seq, {Id, Del, {0, 0}, DiskTree});
+
+seq_tree_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) ->
+    #full_doc_info{
+        id = Id,
+        update_seq = Seq,
+        deleted = ?i2b(Del),
+        sizes = join_sizes(Sizes),
+        rev_tree = rev_tree(DiskTree)
+    };
+
+seq_tree_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
+    % Older versions stored #doc_info records in the seq_tree.
+    % Compact to upgrade.
+    Revs = lists:map(fun({Rev, Seq, Bp}) ->
+        #rev_info{rev = Rev, seq = Seq, deleted = false, body_sp = Bp}
+    end, RevInfos),
+    DeletedRevs = lists:map(fun({Rev, Seq, Bp}) ->
+        #rev_info{rev = Rev, seq = Seq, deleted = true, body_sp = Bp}
+    end, DeletedRevInfos),
+    #doc_info{
+        id = Id,
+        high_seq = KeySeq,
+        revs = Revs ++ DeletedRevs
+    }.
+
+
+seq_tree_reduce(reduce, DocInfos) ->
+    % count the number of documents
+    length(DocInfos);
+seq_tree_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+
+
+local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_binary(Rev) ->
+    #doc{
+        id = Id,
+        body = BodyData
+    } = Doc,
+    {Id, {binary_to_integer(Rev), BodyData}};
+
+local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_integer(Rev) ->
+    #doc{
+        id = Id,
+        body = BodyData
+    } = Doc,
+    {Id, {Rev, BodyData}}.
+
+
+local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) ->
+    #doc{
+        id = Id,
+        revs = {0, [integer_to_binary(Rev)]},
+        body = BodyData
+    }.
+
+
+set_update_seq(#st{header = Header} = St, UpdateSeq) ->
+    {ok, St#st{
+        header = couch_bt_engine_header:set(Header, [
+            {update_seq, UpdateSeq}
+        ]),
+        needs_commit = true
+    }}.
+
+
+copy_security(#st{header = Header} = St, SecProps) ->
+    Options = [{compression, St#st.compression}],
+    {ok, Ptr, _} = couch_file:append_term(St#st.fd, SecProps, Options),
+    {ok, St#st{
+        header = couch_bt_engine_header:set(Header, [
+            {security_ptr, Ptr}
+        ]),
+        needs_commit = true
+    }}.
+
+
+open_db_file(FilePath, Options) ->
+    case couch_file:open(FilePath, Options) of
+        {ok, Fd} ->
+            {ok, Fd};
+        {error, enoent} ->
+            % Couldn't find file. is there a compact version? This ca
+            % happen (rarely) if we crashed during the file switch.
+            case couch_file:open(FilePath ++ ".compact", [nologifmissing]) of
+                {ok, Fd} ->
+                    Fmt = "Recovering from compaction file: ~s~s",
+                    couch_log:info(Fmt, [FilePath, ".compact"]),
+                    ok = file:rename(FilePath ++ ".compact", FilePath),
+                    ok = couch_file:sync(Fd),
+                    {ok, Fd};
+                {error, enoent} ->
+                    throw({not_found, no_db_file})
+            end;
+        Error ->
+            throw(Error)
+    end.
+
+
+init_state(FilePath, Fd, Header0, Options) ->
+    DefaultFSync = "[before_header, after_header, on_file_open]",
+    FsyncStr = config:get("couchdb", "fsync_options", DefaultFSync),
+    {ok, FsyncOptions} = couch_util:parse_term(FsyncStr),
+
+    case lists:member(on_file_open, FsyncOptions) of
+        true -> ok = couch_file:sync(Fd);
+        _ -> ok
+    end,
+
+    Compression = couch_compress:get_compression_method(),
+
+    Header1 = couch_bt_engine_header:upgrade(Header0),
+    Header = set_default_security_object(Fd, Header1, Compression, Options),
+
+    IdTreeState = couch_bt_engine_header:id_tree_state(Header),
+    {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [
+            {split, fun ?MODULE:id_tree_split/1},
+            {join, fun ?MODULE:id_tree_join/2},
+            {reduce, fun ?MODULE:id_tree_reduce/2},
+            {compression, Compression}
+        ]),
+
+    SeqTreeState = couch_bt_engine_header:seq_tree_state(Header),
+    {ok, SeqTree} = couch_btree:open(SeqTreeState, Fd, [
+            {split, fun ?MODULE:seq_tree_split/1},
+            {join, fun ?MODULE:seq_tree_join/2},
+            {reduce, fun ?MODULE:seq_tree_reduce/2},
+            {compression, Compression}
+        ]),
+
+    LocalTreeState = couch_bt_engine_header:local_tree_state(Header),
+    {ok, LocalTree} = couch_btree:open(LocalTreeState, Fd, [
+            {split, fun ?MODULE:local_tree_split/1},
+            {join, fun ?MODULE:local_tree_join/2},
+            {compression, Compression}
+        ]),
+
+    ok = couch_file:set_db_pid(Fd, self()),
+
+    St = #st{
+        filepath = FilePath,
+        fd = Fd,
+        fd_monitor = erlang:monitor(process, Fd),
+        fsync_options = FsyncOptions,
+        header = Header,
+        needs_commit = false,
+        id_tree = IdTree,
+        seq_tree = SeqTree,
+        local_tree = LocalTree,
+        compression = Compression
+    },
+
+    % If this is a new database we've just created a
+    % new UUID and default security object which need
+    % to be written to disk.
+    case Header /= Header0 of
+        true ->
+            {ok, NewSt} = commit_data(St),
+            NewSt;
+        false ->
+            St
+    end.
+
+
+update_header(St, Header) ->
+    couch_bt_engine_header:set(Header, [
+        {seq_tree_state, couch_btree:get_state(St#st.seq_tree)},
+        {id_tree_state, couch_btree:get_state(St#st.id_tree)},
+        {local_tree_state, couch_btree:get_state(St#st.local_tree)}
+    ]).
+
+
+increment_update_seq(#st{header = Header} = St) ->
+    UpdateSeq = couch_bt_engine_header:get(Header, update_seq),
+    St#st{
+        header = couch_bt_engine_header:set(Header, [
+            {update_seq, UpdateSeq + 1}
+        ])
+    }.
+
+
+set_default_security_object(Fd, Header, Compression, Options) ->
+    case couch_bt_engine_header:get(Header, security_ptr) of
+        Pointer when is_integer(Pointer) ->
+            Header;
+        _ ->
+            Default = couch_util:get_value(default_security_object, Options),
+            AppendOpts = [{compression, Compression}],
+            {ok, Ptr, _} = couch_file:append_term(Fd, Default, AppendOpts),
+            couch_bt_engine_header:set(Header, security_ptr, Ptr)
+    end.
+
+
+delete_compaction_files(FilePath) ->
+    RootDir = config:get("couchdb", "database_dir", "."),
+    DelOpts = [{context, delete}],
+    delete_compaction_files(RootDir, FilePath, DelOpts).
+
+
+rev_tree(DiskTree) ->
+    couch_key_tree:map(fun
+        (_RevId, {Del, Ptr, Seq}) ->
+            #leaf{
+                deleted = ?i2b(Del),
+                ptr = Ptr,
+                seq = Seq
+            };
+        (_RevId, {Del, Ptr, Seq, Size}) ->
+            #leaf{
+                deleted = ?i2b(Del),
+                ptr = Ptr,
+                seq = Seq,
+                sizes = couch_db_updater:upgrade_sizes(Size)
+            };
+        (_RevId, {Del, Ptr, Seq, Sizes, Atts}) ->
+            #leaf{
+                deleted = ?i2b(Del),
+                ptr = Ptr,
+                seq = Seq,
+                sizes = couch_db_updater:upgrade_sizes(Sizes),
+                atts = Atts
+            };
+        (_RevId, ?REV_MISSING) ->
+            ?REV_MISSING
+    end, DiskTree).
+
+
+disk_tree(RevTree) ->
+    couch_key_tree:map(fun
+        (_RevId, ?REV_MISSING) ->
+            ?REV_MISSING;
+        (_RevId, #leaf{} = Leaf) ->
+            #leaf{
+                deleted = Del,
+                ptr = Ptr,
+                seq = Seq,
+                sizes = Sizes,
+                atts = Atts
+            } = Leaf,
+            {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts}
+    end, RevTree).
+
+
+split_sizes(#size_info{}=SI) ->
+    {SI#size_info.active, SI#size_info.external}.
+
+
+join_sizes({Active, External}) when is_integer(Active), is_integer(External) ->
+    #size_info{active=Active, external=External}.
+
+
+reduce_sizes(nil, _) ->
+    nil;
+reduce_sizes(_, nil) ->
+    nil;
+reduce_sizes(#size_info{}=S1, #size_info{}=S2) ->
+    #size_info{
+        active = S1#size_info.active + S2#size_info.active,
+        external = S1#size_info.external + S2#size_info.external
+    };
+reduce_sizes(S1, S2) ->
+    US1 = couch_db_updater:upgrade_sizes(S1),
+    US2 = couch_db_updater:upgrade_sizes(S2),
+    reduce_sizes(US1, US2).
+
+
+active_size(#st{} = St, #size_info{} = SI) ->
+    Trees = [
+        St#st.id_tree,
+        St#st.seq_tree,
+        St#st.local_tree
+    ],
+    lists:foldl(fun(T, Acc) ->
+        case couch_btree:size(T) of
+            _ when Acc == null ->
+                null;
+            nil ->
+                null;
+            Size ->
+                Acc + Size
+        end
+    end, SI#size_info.active, Trees).
+
+
+fold_docs_int(Tree, UserFun, UserAcc, Options) ->
+    Fun = case lists:member(include_deleted, Options) of
+        true -> fun include_deleted/4;
+        false -> fun skip_deleted/4
+    end,
+    RedFun = case lists:member(include_reductions, Options) of
+        true -> fun include_reductions/4;
+        false -> fun drop_reductions/4
+    end,
+    InAcc = {RedFun, {UserFun, UserAcc}},
+    {ok, Reds, OutAcc} = couch_btree:fold(Tree, Fun, InAcc, Options),
+    {_, {_, FinalUserAcc}} = OutAcc,
+    case lists:member(include_reductions, Options) of
+        true ->
+            {ok, fold_docs_reduce_to_count(Reds), FinalUserAcc};
+        false ->
+            {ok, FinalUserAcc}
+    end.
+
+
+include_deleted(Case, Entry, Reds, {UserFun, UserAcc}) ->
+    {Go, NewUserAcc} = UserFun(Case, Entry, Reds, UserAcc),
+    {Go, {UserFun, NewUserAcc}}.
+
+
+% First element of the reductions is the total
+% number of undeleted documents.
+skip_deleted(traverse, _Entry, {0, _, _} = _Reds, Acc) ->
+    {skip, Acc};
+skip_deleted(visit, #full_doc_info{deleted = true}, _, Acc) ->
+    {ok, Acc};
+skip_deleted(Case, Entry, Reds, {UserFun, UserAcc}) ->
+    {Go, NewUserAcc} = UserFun(Case, Entry, Reds, UserAcc),
+    {Go, {UserFun, NewUserAcc}}.
+
+
+include_reductions(visit, FDI, Reds, {UserFun, UserAcc}) ->
+    {Go, NewUserAcc} = UserFun(FDI, Reds, UserAcc),
+    {Go, {UserFun, NewUserAcc}};
+include_reductions(_, _, _, Acc) ->
+    {ok, Acc}.
+
+
+drop_reductions(visit, FDI, _Reds, {UserFun, UserAcc}) ->
+    {Go, NewUserAcc} = UserFun(FDI, UserAcc),
+    {Go, {UserFun, NewUserAcc}};
+drop_reductions(_, _, _, Acc) ->
+    {ok, Acc}.
+
+
+fold_docs_reduce_to_count(Reds) ->
+    RedFun = fun id_tree_reduce/2,
+    FinalRed = couch_btree:final_reduce(RedFun, Reds),
+    element(1, FinalRed).
+
+
+finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
+    #st{
+        filepath = FilePath,
+        local_tree = OldLocal
+    } = OldSt,
+    #st{
+        filepath = CompactDataPath,
+        header = Header,
+        local_tree = NewLocal1
+    } = NewSt1,
+
+    % suck up all the local docs into memory and write them to the new db
+    LoadFun = fun(Value, _Offset, Acc) ->
+        {ok, [Value | Acc]}
+    end,
+    {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
+    {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
+
+    {ok, NewSt2} = commit_data(NewSt1#st{
+        header = couch_bt_engine_header:set(Header, [
+            {compacted_seq, get_update_seq(OldSt)},
+            {revs_limit, get_revs_limit(OldSt)}
+        ]),
+        local_tree = NewLocal2
+    }),
+
+    % Rename our *.compact.data file to *.compact so that if we
+    % die between deleting the old file and renaming *.compact
+    % we can recover correctly.
+    ok = file:rename(CompactDataPath, FilePath ++ ".compact"),
+
+    % Remove the uncompacted database file
+    RootDir = config:get("couchdb", "database_dir", "."),
+    couch_file:delete(RootDir, FilePath),
+
+    % Move our compacted file into its final location
+    ok = file:rename(FilePath ++ ".compact", FilePath),
+
+    % Delete the old meta compaction file after promoting
+    % the compaction file.
+    couch_file:delete(RootDir, FilePath ++ ".compact.meta"),
+
+    % We're finished with our old state
+    decref(OldSt),
+
+    % And return our finished new state
+    {ok, NewSt2#st{
+        filepath = FilePath
+    }, undefined}.
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
new file mode 100644
index 0000000..7f52d8f
--- /dev/null
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(st, {
+    filepath,
+    fd,
+    fd_monitor,
+    fsync_options,
+    header,
+    needs_commit,
+    id_tree,
+    seq_tree,
+    local_tree,
+    compression
+}).
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
new file mode 100644
index 0000000..843da57
--- /dev/null
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -0,0 +1,497 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_compactor).
+
+
+-export([
+    start/4
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_bt_engine.hrl").
+
+
+-record(comp_header, {
+    db_header,
+    meta_state
+}).
+
+-record(merge_st, {
+    id_tree,
+    seq_tree,
+    curr,
+    rem_seqs,
+    infos
+}).
+
+
+start(#st{} = St, DbName, Options, Parent) ->
+    erlang:put(io_priority, {db_compact, DbName}),
+    #st{
+        filepath = FilePath,
+        header = Header
+    } = St,
+    couch_log:debug("Compaction process spawned for db \"~s\"", [DbName]),
+
+    {ok, NewSt, DName, DFd, MFd, Retry} =
+            open_compaction_files(Header, FilePath, Options),
+    erlang:monitor(process, MFd),
+
+    % This is a bit worrisome. init_db/4 will monitor the data fd
+    % but it doesn't know about the meta fd. For now I'll maintain
+    % that the data fd is the old normal fd and meta fd is special
+    % and hope everything works out for the best.
+    unlink(DFd),
+
+    NewSt1 = copy_purge_info(St, NewSt),
+    NewSt2 = copy_compact(DbName, St, NewSt1, Retry),
+    NewSt3 = sort_meta_data(NewSt2),
+    NewSt4 = commit_compaction_data(NewSt3),
+    NewSt5 = copy_meta_data(NewSt4),
+    {ok, NewSt6} = couch_bt_engine:commit_data(NewSt5),
+    ok = couch_bt_engine:decref(NewSt6),
+    ok = couch_file:close(MFd),
+
+    % Done
+    gen_server:cast(Parent, {compact_done, couch_bt_engine, DName}).
+
+
+open_compaction_files(SrcHdr, DbFilePath, Options) ->
+    DataFile = DbFilePath ++ ".compact.data",
+    MetaFile = DbFilePath ++ ".compact.meta",
+    {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
+    {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+    DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
+    case {DataHdr, MetaHdr} of
+        {#comp_header{}=A, #comp_header{}=A} ->
+            DbHeader = A#comp_header.db_header,
+            St0 = couch_bt_engine:init_state(
+                    DataFile, DataFd, DbHeader, Options),
+            St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_state),
+            {ok, St1, DataFile, DataFd, MetaFd, St0#st.id_tree};
+        _ when DataHdrIsDbHdr ->
+            Header = couch_bt_engine_header:from(SrcHdr),
+            ok = reset_compaction_file(MetaFd, Header),
+            St0 = couch_bt_engine:init_state(
+                    DataFile, DataFd, DataHdr, Options),
+            St1 = bind_emsort(St0, MetaFd, nil),
+            {ok, St1, DataFile, DataFd, MetaFd, St0#st.id_tree};
+        _ ->
+            Header = couch_bt_engine_header:from(SrcHdr),
+            ok = reset_compaction_file(DataFd, Header),
+            ok = reset_compaction_file(MetaFd, Header),
+            St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
+            St1 = bind_emsort(St0, MetaFd, nil),
+            {ok, St1, DataFile, DataFd, MetaFd, nil}
+    end.
+
+
+copy_purge_info(OldSt, NewSt) ->
+    OldHdr = OldSt#st.header,
+    NewHdr = NewSt#st.header,
+    OldPurgeSeq = couch_bt_engine_header:purge_seq(OldHdr),
+    case OldPurgeSeq > 0 of
+        true ->
+            Purged = couch_bt_engine:get_last_purged(OldSt),
+            Opts = [{compression, NewSt#st.compression}],
+            {ok, Ptr, _} = couch_file:append_term(NewSt#st.fd, Purged, Opts),
+            NewNewHdr = couch_bt_engine_header:set(NewHdr, [
+                {purge_seq, OldPurgeSeq},
+                {purged_docs, Ptr}
+            ]),
+            NewSt#st{header = NewNewHdr};
+        false ->
+            NewSt
+    end.
+
+
+copy_compact(DbName, St, NewSt0, Retry) ->
+    Compression = couch_compress:get_compression_method(),
+    NewSt = NewSt0#st{compression = Compression},
+    NewUpdateSeq = couch_bt_engine:get_update_seq(NewSt0),
+    TotalChanges = couch_bt_engine:count_changes_since(St, NewUpdateSeq),
+    BufferSize = list_to_integer(
+        config:get("database_compaction", "doc_buffer_size", "524288")),
+    CheckpointAfter = couch_util:to_integer(
+        config:get("database_compaction", "checkpoint_after",
+            BufferSize * 10)),
+
+    EnumBySeqFun =
+    fun(DocInfo, _Offset,
+            {AccNewSt, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
+
+        Seq = case DocInfo of
+            #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
+            #doc_info{} -> DocInfo#doc_info.high_seq
+        end,
+
+        AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
+        if AccUncopiedSize2 >= BufferSize ->
+            NewSt2 = copy_docs(
+                St, AccNewSt, lists:reverse([DocInfo | AccUncopied]), Retry),
+            AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
+            if AccCopiedSize2 >= CheckpointAfter ->
+                {ok, NewSt3} = couch_bt_engine:set_update_seq(NewSt2, Seq),
+                CommNewSt3 = commit_compaction_data(NewSt3),
+                {ok, {CommNewSt3, [], 0, 0}};
+            true ->
+                {ok, NewSt3} = couch_bt_engine:set_update_seq(NewSt2, Seq),
+                {ok, {NewSt3, [], 0, AccCopiedSize2}}
+            end;
+        true ->
+            {ok, {AccNewSt, [DocInfo | AccUncopied], AccUncopiedSize2,
+                AccCopiedSize}}
+        end
+    end,
+
+    TaskProps0 = [
+        {type, database_compaction},
+        {database, DbName},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, TotalChanges}
+    ],
+    case (Retry =/= nil) and couch_task_status:is_task_added() of
+    true ->
+        couch_task_status:update([
+            {retry, true},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, TotalChanges}
+        ]);
+    false ->
+        couch_task_status:add_task(TaskProps0),
+        couch_task_status:set_update_frequency(500)
+    end,
+
+    {ok, _, {NewSt2, Uncopied, _, _}} =
+        couch_btree:foldl(St#st.seq_tree, EnumBySeqFun,
+            {NewSt, [], 0, 0},
+            [{start_key, NewUpdateSeq + 1}]),
+
+    NewSt3 = copy_docs(St, NewSt2, lists:reverse(Uncopied), Retry),
+
+    % Copy the security information over
+    SecProps = couch_bt_engine:get_security(St),
+    {ok, NewSt4} = couch_bt_engine:copy_security(NewSt3, SecProps),
+
+    FinalUpdateSeq = couch_bt_engine:get_update_seq(St),
+    {ok, NewSt5} = couch_bt_engine:set_update_seq(NewSt4, FinalUpdateSeq),
+    commit_compaction_data(NewSt5).
+
+
+copy_docs(St, #st{} = NewSt, MixedInfos, Retry) ->
+    DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
+    LookupResults = couch_btree:lookup(St#st.id_tree, DocInfoIds),
+    % COUCHDB-968, make sure we prune duplicates during compaction
+    NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
+        A =< B
+    end, merge_lookups(MixedInfos, LookupResults)),
+
+    NewInfos1 = lists:map(fun(Info) ->
+        {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
+            ({RevPos, RevId}, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
+                {Body, AttInfos} = copy_doc_attachments(St, Sp, NewSt),
+                % In the future, we should figure out how to do this for
+                % upgrade purposes.
+                EJsonBody = case is_binary(Body) of
+                    true ->
+                        couch_compress:decompress(Body);
+                    false ->
+                        Body
+                end,
+                Doc0 = #doc{
+                    id = Info#full_doc_info.id,
+                    revs = {RevPos, [RevId]},
+                    deleted = Leaf#leaf.deleted,
+                    body = Body,
+                    atts = AttInfos
+                },
+                Doc1 = couch_bt_engine:serialize_doc(NewSt, Doc0),
+                ExternalSize = ?term_size(EJsonBody),
+                {ok, Doc2, ActiveSize} =
+                        couch_bt_engine:write_doc_body(NewSt, Doc1),
+                AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
+                NewLeaf = Leaf#leaf{
+                    ptr = Doc2#doc.body,
+                    sizes = #size_info{
+                        active = ActiveSize,
+                        external = ExternalSize
+                    },
+                    atts = AttSizes
+                },
+                {NewLeaf, couch_db_updater:add_sizes(leaf, NewLeaf, SizesAcc)};
+            (_Rev, _Leaf, branch, SizesAcc) ->
+                {?REV_MISSING, SizesAcc}
+        end, {0, 0, []}, Info#full_doc_info.rev_tree),
+        {FinalAS, FinalES, FinalAtts} = FinalAcc,
+        TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
+        NewActiveSize = FinalAS + TotalAttSize,
+        NewExternalSize = FinalES + TotalAttSize,
+        Info#full_doc_info{
+            rev_tree = NewRevTree,
+            sizes = #size_info{
+                active = NewActiveSize,
+                external = NewExternalSize
+            }
+        }
+    end, NewInfos0),
+
+    Limit = couch_bt_engine:get_revs_limit(St),
+    NewInfos = lists:map(fun(FDI) ->
+        FDI#full_doc_info{
+            rev_tree = couch_key_tree:stem(FDI#full_doc_info.rev_tree, Limit)
+        }
+    end, NewInfos1),
+
+    RemoveSeqs =
+    case Retry of
+    nil ->
+        [];
+    OldDocIdTree ->
+        % Compaction is being rerun to catch up to writes during the
+        % first pass. This means we may have docs that already exist
+        % in the seq_tree in the .data file. Here we lookup any old
+        % update_seqs so that they can be removed.
+        Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
+        Existing = couch_btree:lookup(OldDocIdTree, Ids),
+        [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+    end,
+
+    {ok, SeqTree} = couch_btree:add_remove(
+            NewSt#st.seq_tree, NewInfos, RemoveSeqs),
+
+    FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+        {{Id, Seq}, FDI}
+    end, NewInfos),
+    {ok, IdEms} = couch_emsort:add(NewSt#st.id_tree, FDIKVs),
+    update_compact_task(length(NewInfos)),
+    NewSt#st{id_tree=IdEms, seq_tree=SeqTree}.
+
+
+copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
+    {ok, {BodyData, BinInfos0}} = couch_file:pread_term(SrcSt#st.fd, SrcSp),
+    BinInfos = case BinInfos0 of
+    _ when is_binary(BinInfos0) ->
+        couch_compress:decompress(BinInfos0);
+    _ when is_list(BinInfos0) ->
+        % pre 1.2 file format
+        BinInfos0
+    end,
+    % copy the bin values
+    NewBinInfos = lists:map(
+        fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
+            % 010 UPGRADE CODE
+            {ok, SrcStream} = couch_bt_engine:open_read_stream(SrcSt, BinSp),
+            {ok, DstStream} = couch_bt_engine:open_write_stream(DstSt, []),
+            ok = couch_stream:copy(SrcStream, DstStream),
+            {NewStream, AttLen, AttLen, ActualMd5, _IdentityMd5} =
+                couch_stream:close(DstStream),
+            {ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
+            couch_util:check_md5(ExpectedMd5, ActualMd5),
+            {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
+        ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
+            {ok, SrcStream} = couch_bt_engine:open_read_stream(SrcSt, BinSp),
+            {ok, DstStream} = couch_bt_engine:open_write_stream(DstSt, []),
+            ok = couch_stream:copy(SrcStream, DstStream),
+            {NewStream, AttLen, _, ActualMd5, _IdentityMd5} =
+                couch_stream:close(DstStream),
+            {ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
+            couch_util:check_md5(ExpectedMd5, ActualMd5),
+            Enc = case Enc1 of
+            true ->
+                % 0110 UPGRADE CODE
+                gzip;
+            false ->
+                % 0110 UPGRADE CODE
+                identity;
+            _ ->
+                Enc1
+            end,
+            {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
+        end, BinInfos),
+    {BodyData, NewBinInfos}.
+
+
+sort_meta_data(St0) ->
+    {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
+    St0#st{id_tree=Ems}.
+
+
+copy_meta_data(#st{} = St) ->
+    #st{
+        fd = Fd,
+        header = Header,
+        id_tree = Src
+    } = St,
+    DstState = couch_bt_engine_header:id_tree_state(Header),
+    {ok, IdTree0} = couch_btree:open(DstState, Fd, [
+        {split, fun couch_bt_engine:id_tree_split/1},
+        {join, fun couch_bt_engine:id_tree_join/2},
+        {reduce, fun couch_bt_engine:id_tree_reduce/2}
+    ]),
+    {ok, Iter} = couch_emsort:iter(Src),
+    Acc0 = #merge_st{
+        id_tree=IdTree0,
+        seq_tree=St#st.seq_tree,
+        rem_seqs=[],
+        infos=[]
+    },
+    Acc = merge_docids(Iter, Acc0),
+    {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
+    {ok, SeqTree} = couch_btree:add_remove(
+        Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
+    ),
+    St#st{id_tree=IdTree, seq_tree=SeqTree}.
+
+
+open_compaction_file(FilePath) ->
+    case couch_file:open(FilePath, [nologifmissing]) of
+        {ok, Fd} ->
+            case couch_file:read_header(Fd) of
+                {ok, Header} -> {ok, Fd, Header};
+                no_valid_header -> {ok, Fd, nil}
+            end;
+        {error, enoent} ->
+            {ok, Fd} = couch_file:open(FilePath, [create]),
+            {ok, Fd, nil}
+    end.
+
+
+reset_compaction_file(Fd, Header) ->
+    ok = couch_file:truncate(Fd, 0),
+    ok = couch_file:write_header(Fd, Header).
+
+
+commit_compaction_data(#st{}=St) ->
+    % Compaction needs to write headers to both the data file
+    % and the meta file so if we need to restart we can pick
+    % back up from where we left off.
+    commit_compaction_data(St, couch_emsort:get_fd(St#st.id_tree)),
+    commit_compaction_data(St, St#st.fd).
+
+
+commit_compaction_data(#st{header = OldHeader} = St0, Fd) ->
+    DataState = couch_bt_engine_header:id_tree_state(OldHeader),
+    MetaFd = couch_emsort:get_fd(St0#st.id_tree),
+    MetaState = couch_emsort:get_state(St0#st.id_tree),
+    St1 = bind_id_tree(St0, St0#st.fd, DataState),
+    Header = St1#st.header,
+    CompHeader = #comp_header{
+        db_header = Header,
+        meta_state = MetaState
+    },
+    ok = couch_file:sync(Fd),
+    ok = couch_file:write_header(Fd, CompHeader),
+    St2 = St1#st{
+        header = Header
+    },
+    bind_emsort(St2, MetaFd, MetaState).
+
+
+bind_emsort(St, Fd, nil) ->
+    {ok, Ems} = couch_emsort:open(Fd),
+    St#st{id_tree=Ems};
+bind_emsort(St, Fd, State) ->
+    {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
+    St#st{id_tree=Ems}.
+
+
+bind_id_tree(St, Fd, State) ->
+    {ok, IdBtree} = couch_btree:open(State, Fd, [
+        {split, fun couch_bt_engine:id_tree_split/1},
+        {join, fun couch_bt_engine:id_tree_join/2},
+        {reduce, fun couch_bt_engine:id_tree_reduce/2}
+    ]),
+    St#st{id_tree=IdBtree}.
+
+
+merge_lookups(Infos, []) ->
+    Infos;
+merge_lookups([], _) ->
+    [];
+merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
+    % Assert we've matched our lookups
+    if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
+        erlang:error({mismatched_doc_infos, DI#doc_info.id})
+    end,
+    [FDI | merge_lookups(RestInfos, RestLookups)];
+merge_lookups([FDI | RestInfos], Lookups) ->
+    [FDI | merge_lookups(RestInfos, Lookups)].
+
+
+merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
+    #merge_st{
+        id_tree=IdTree0,
+        seq_tree=SeqTree0,
+        rem_seqs=RemSeqs
+    } = Acc,
+    {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
+    {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
+    Acc1 = Acc#merge_st{
+        id_tree=IdTree1,
+        seq_tree=SeqTree1,
+        rem_seqs=[],
+        infos=[]
+    },
+    merge_docids(Iter, Acc1);
+merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
+    case next_info(Iter, Curr, []) of
+        {NextIter, NewCurr, FDI, Seqs} ->
+            Acc1 = Acc#merge_st{
+                infos = [FDI | Acc#merge_st.infos],
+                rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
+                curr = NewCurr
+            },
+            merge_docids(NextIter, Acc1);
+        {finished, FDI, Seqs} ->
+            Acc#merge_st{
+                infos = [FDI | Acc#merge_st.infos],
+                rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
+                curr = undefined
+            };
+        empty ->
+            Acc
+    end.
+
+
+next_info(Iter, undefined, []) ->
+    case couch_emsort:next(Iter) of
+        {ok, {{Id, Seq}, FDI}, NextIter} ->
+            next_info(NextIter, {Id, Seq, FDI}, []);
+        finished ->
+            empty
+    end;
+next_info(Iter, {Id, Seq, FDI}, Seqs) ->
+    case couch_emsort:next(Iter) of
+        {ok, {{Id, NSeq}, NFDI}, NextIter} ->
+            next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
+        {ok, {{NId, NSeq}, NFDI}, NextIter} ->
+            {NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
+        finished ->
+            {finished, FDI, Seqs}
+    end.
+
+
+update_compact_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+    0 ->
+        0;
+    _ ->
+        (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
new file mode 100644
index 0000000..3d24f31
--- /dev/null
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -0,0 +1,434 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_header).
+
+
+-export([
+    new/0,
+    from/1,
+    is_header/1,
+    upgrade/1,
+    get/2,
+    get/3,
+    set/2,
+    set/3
+]).
+
+-export([
+    disk_version/1,
+    update_seq/1,
+    id_tree_state/1,
+    seq_tree_state/1,
+    latest/1,
+    local_tree_state/1,
+    purge_seq/1,
+    purged_docs/1,
+    security_ptr/1,
+    revs_limit/1,
+    uuid/1,
+    epochs/1,
+    compacted_seq/1
+]).
+
+
+% This should be updated anytime a header change happens that requires more
+% than filling in new defaults.
+%
+% As long the changes are limited to new header fields (with inline
+% defaults) added to the end of the record, then there is no need to increment
+% the disk revision number.
+%
+% if the disk revision is incremented, then new upgrade logic will need to be
+% added to couch_db_updater:init_db.
+
+-define(LATEST_DISK_VERSION, 6).
+
+-record(db_header, {
+    disk_version = ?LATEST_DISK_VERSION,
+    update_seq = 0,
+    unused = 0,
+    id_tree_state = nil,
+    seq_tree_state = nil,
+    local_tree_state = nil,
+    purge_seq = 0,
+    purged_docs = nil,
+    security_ptr = nil,
+    revs_limit = 1000,
+    uuid,
+    epochs,
+    compacted_seq
+}).
+
+
+new() ->
+    #db_header{
+        uuid = couch_uuids:random(),
+        epochs = [{node(), 0}]
+    }.
+
+
+from(Header0) ->
+    Header = upgrade(Header0),
+    #db_header{
+        uuid = Header#db_header.uuid,
+        epochs = Header#db_header.epochs,
+        compacted_seq = Header#db_header.compacted_seq
+    }.
+
+
+is_header(Header) ->
+    try
+        upgrade(Header),
+        true
+    catch _:_ ->
+        false
+    end.
+
+
+upgrade(Header) ->
+    Funs = [
+        fun upgrade_tuple/1,
+        fun upgrade_disk_version/1,
+        fun upgrade_uuid/1,
+        fun upgrade_epochs/1,
+        fun upgrade_compacted_seq/1
+    ],
+    lists:foldl(fun(F, HdrAcc) ->
+        F(HdrAcc)
+    end, Header, Funs).
+
+
+get(Header, Key) ->
+    ?MODULE:get(Header, Key, undefined).
+
+
+get(Header, Key, Default) ->
+    get_field(Header, Key, Default).
+
+
+set(Header, Key, Value) ->
+    ?MODULE:set(Header, [{Key, Value}]).
+
+
+set(Header0, Fields) ->
+    % A subtlety here is that if a database was open during
+    % the release upgrade that updates to uuids and epochs then
+    % this dynamic upgrade also assigns a uuid and epoch.
+    Header = upgrade(Header0),
+    lists:foldl(fun({Field, Value}, HdrAcc) ->
+        set_field(HdrAcc, Field, Value)
+    end, Header, Fields).
+
+
+disk_version(Header) ->
+    get_field(Header, disk_version).
+
+
+update_seq(Header) ->
+    get_field(Header, update_seq).
+
+
+id_tree_state(Header) ->
+    get_field(Header, id_tree_state).
+
+
+seq_tree_state(Header) ->
+    get_field(Header, seq_tree_state).
+
+
+local_tree_state(Header) ->
+    get_field(Header, local_tree_state).
+
+
+purge_seq(Header) ->
+    get_field(Header, purge_seq).
+
+
+purged_docs(Header) ->
+    get_field(Header, purged_docs).
+
+
+security_ptr(Header) ->
+    get_field(Header, security_ptr).
+
+
+revs_limit(Header) ->
+    get_field(Header, revs_limit).
+
+
+uuid(Header) ->
+    get_field(Header, uuid).
+
+
+epochs(Header) ->
+    get_field(Header, epochs).
+
+
+compacted_seq(Header) ->
+    get_field(Header, compacted_seq).
+
+
+get_field(Header, Field) ->
+    get_field(Header, Field, undefined).
+
+
+get_field(Header, Field, Default) ->
+    Idx = index(Field),
+    case Idx > tuple_size(Header) of
+        true -> Default;
+        false -> element(index(Field), Header)
+    end.
+
+
+set_field(Header, Field, Value) ->
+    setelement(index(Field), Header, Value).
+
+
+index(Field) ->
+    couch_util:get_value(Field, indexes()).
+
+
+indexes() ->
+    Fields = record_info(fields, db_header),
+    Indexes = lists:seq(2, record_info(size, db_header)),
+    lists:zip(Fields, Indexes).
+
+
+upgrade_tuple(Old) when is_record(Old, db_header) ->
+    Old;
+upgrade_tuple(Old) when is_tuple(Old) ->
+    NewSize = record_info(size, db_header),
+    if tuple_size(Old) < NewSize -> ok; true ->
+        erlang:error({invalid_header_size, Old})
+    end,
+    {_, New} = lists:foldl(fun(Val, {Idx, Hdr}) ->
+        {Idx+1, setelement(Idx, Hdr, Val)}
+    end, {1, #db_header{}}, tuple_to_list(Old)),
+    if is_record(New, db_header) -> ok; true ->
+        erlang:error({invalid_header_extension, {Old, New}})
+    end,
+    New.
+
+-define(OLD_DISK_VERSION_ERROR,
+    "Database files from versions smaller than 0.10.0 are no longer supported").
+
+upgrade_disk_version(#db_header{}=Header) ->
+    case element(2, Header) of
+        1 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+        2 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+        3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
+        4 -> Header#db_header{security_ptr = nil}; % [0.10 - 0.11)
+        5 -> Header; % pre 1.2
+        ?LATEST_DISK_VERSION -> Header;
+        _ ->
+            Reason = "Incorrect disk header version",
+            throw({database_disk_version_error, Reason})
+    end.
+
+
+upgrade_uuid(#db_header{}=Header) ->
+    case Header#db_header.uuid of
+        undefined ->
+            % Upgrading this old db file to a newer
+            % on disk format that includes a UUID.
+            Header#db_header{uuid=couch_uuids:random()};
+        _ ->
+            Header
+    end.
+
+
+upgrade_epochs(#db_header{}=Header) ->
+    NewEpochs = case Header#db_header.epochs of
+        undefined ->
+            % This node is taking over ownership of shard with
+            % and old version of couch file. Before epochs there
+            % was always an implicit assumption that a file was
+            % owned since eternity by the node it was on. This
+            % just codifies that assumption.
+            [{node(), 0}];
+        [{Node, _} | _] = Epochs0 when Node == node() ->
+            % Current node is the current owner of this db
+            Epochs0;
+        Epochs1 ->
+            % This node is taking over ownership of this db
+            % and marking the update sequence where it happened.
+            [{node(), Header#db_header.update_seq} | Epochs1]
+    end,
+    % Its possible for a node to open a db and claim
+    % ownership but never make a write to the db. This
+    % removes nodes that claimed ownership but never
+    % changed the database.
+    DedupedEpochs = remove_dup_epochs(NewEpochs),
+    Header#db_header{epochs=DedupedEpochs}.
+
+
+% This is slightly relying on the udpate_seq's being sorted
+% in epochs due to how we only ever push things onto the
+% front. Although if we ever had a case where the update_seq
+% is not monotonically increasing I don't know that we'd
+% want to remove dupes (by calling a sort on the input to this
+% function). So for now we don't sort but are relying on the
+% idea that epochs is always sorted.
+remove_dup_epochs([_]=Epochs) ->
+    Epochs;
+remove_dup_epochs([{N1, S}, {_N2, S}]) ->
+    % Seqs match, keep the most recent owner
+    [{N1, S}];
+remove_dup_epochs([_, _]=Epochs) ->
+    % Seqs don't match.
+    Epochs;
+remove_dup_epochs([{N1, S}, {_N2, S} | Rest]) ->
+    % Seqs match, keep the most recent owner
+    remove_dup_epochs([{N1, S} | Rest]);
+remove_dup_epochs([{N1, S1}, {N2, S2} | Rest]) ->
+    % Seqs don't match, recurse to check others
+    [{N1, S1} | remove_dup_epochs([{N2, S2} | Rest])].
+
+
+upgrade_compacted_seq(#db_header{}=Header) ->
+    case Header#db_header.compacted_seq of
+        undefined ->
+            Header#db_header{compacted_seq=0};
+        _ ->
+            Header
+    end.
+
+latest(?LATEST_DISK_VERSION) ->
+    true;
+latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION ->
+    false;
+latest(_Else) ->
+    undefined.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+mk_header(Vsn) ->
+    {
+        db_header, % record name
+        Vsn, % disk version
+        100, % update_seq
+        0, % unused
+        foo, % id_tree_state
+        bar, % seq_tree_state
+        bam, % local_tree_state
+        1, % purge_seq
+        baz, % purged_docs
+        bang, % security_ptr
+        999 % revs_limit
+    }.
+
+
+-ifdef(run_broken_tests).
+
+upgrade_v3_test() ->
+    Vsn3Header = mk_header(3),
+    NewHeader = upgrade_tuple(Vsn3Header),
+
+    % Tuple upgrades don't change
+    ?assert(is_record(NewHeader, db_header)),
+    ?assertEqual(3, disk_version(NewHeader)),
+    ?assertEqual(100, update_seq(NewHeader)),
+    ?assertEqual(foo, id_tree_state(NewHeader)),
+    ?assertEqual(bar, seq_tree_state(NewHeader)),
+    ?assertEqual(bam, local_tree_state(NewHeader)),
+    ?assertEqual(1, purge_seq(NewHeader)),
+    ?assertEqual(baz, purged_docs(NewHeader)),
+    ?assertEqual(bang, security_ptr(NewHeader)),
+    ?assertEqual(999, revs_limit(NewHeader)),
+    ?assertEqual(undefined, uuid(NewHeader)),
+    ?assertEqual(undefined, epochs(NewHeader)),
+
+    % Security ptr isn't changed until upgrade_disk_version/1
+    NewNewHeader = upgrade_disk_version(NewHeader),
+    ?assert(is_record(NewNewHeader, db_header)),
+    ?assertEqual(nil, security_ptr(NewNewHeader)),
+
+    % Assert upgrade works on really old headers
+    NewestHeader = upgrade(Vsn3Header),
+    ?assertMatch(<<_:32/binary>>, uuid(NewestHeader)),
+    ?assertEqual([{node(), 0}], epochs(NewestHeader)).
+
+-endif.
+
+upgrade_v5_test() ->
+    Vsn5Header = mk_header(5),
+    NewHeader = upgrade_disk_version(upgrade_tuple(Vsn5Header)),
+
+    ?assert(is_record(NewHeader, db_header)),
+    ?assertEqual(5, disk_version(NewHeader)),
+
+    % Security ptr isn't changed for v5 headers
+    ?assertEqual(bang, security_ptr(NewHeader)).
+
+
+upgrade_uuid_test() ->
+    Vsn5Header = mk_header(5),
+
+    % Upgraded headers get a new UUID
+    NewHeader = upgrade_uuid(upgrade_disk_version(upgrade_tuple(Vsn5Header))),
+    ?assertMatch(<<_:32/binary>>, uuid(NewHeader)),
+
+    % Headers with a UUID don't have their UUID changed
+    NewNewHeader = upgrade_uuid(upgrade_disk_version(upgrade_tuple(NewHeader))),
+    ?assertEqual(uuid(NewHeader), uuid(NewNewHeader)),
+
+    % Derived empty headers maintain the same UUID
+    ResetHeader = from(NewNewHeader),
+    ?assertEqual(uuid(NewHeader), uuid(ResetHeader)).
+
+
+upgrade_epochs_test() ->
+    Vsn5Header = mk_header(5),
+
+    % Upgraded headers get a default epochs set
+    NewHeader = upgrade(Vsn5Header),
+    ?assertEqual([{node(), 0}], epochs(NewHeader)),
+
+    % Fake an old entry in epochs
+    FakeFields = [
+        {update_seq, 20},
+        {epochs, [{'someothernode@someotherhost', 0}]}
+    ],
+    NotOwnedHeader = set(NewHeader, FakeFields),
+
+    OwnedEpochs = [
+        {node(), 20},
+        {'someothernode@someotherhost', 0}
+    ],
+
+    % Upgrading a header not owned by the local node updates
+    % the epochs appropriately.
+    NowOwnedHeader = upgrade(NotOwnedHeader),
+    ?assertEqual(OwnedEpochs, epochs(NowOwnedHeader)),
+
+    % Headers with epochs stay the same after upgrades
+    NewNewHeader = upgrade(NowOwnedHeader),
+    ?assertEqual(OwnedEpochs, epochs(NewNewHeader)),
+
+    % Getting a reset header maintains the epoch data
+    ResetHeader = from(NewNewHeader),
+    ?assertEqual(OwnedEpochs, epochs(ResetHeader)).
+
+
+get_uuid_from_old_header_test() ->
+    Vsn5Header = mk_header(5),
+    ?assertEqual(undefined, uuid(Vsn5Header)).
+
+
+get_epochs_from_old_header_test() ->
+    Vsn5Header = mk_header(5),
+    ?assertEqual(undefined, epochs(Vsn5Header)).
+
+
+-endif.
diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl
new file mode 100644
index 0000000..431894a
--- /dev/null
+++ b/src/couch/src/couch_bt_engine_stream.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_stream).
+
+-export([
+    foldl/3,
+    seek/2,
+    write/2,
+    finalize/1,
+    to_disk_term/1
+]).
+
+
+foldl({_Fd, []}, _Fun, Acc) ->
+    Acc;
+
+foldl({Fd, [{Pos, _} | Rest]}, Fun, Acc) ->
+    foldl({Fd, [Pos | Rest]}, Fun, Acc);
+
+foldl({Fd, [Bin | Rest]}, Fun, Acc) when is_binary(Bin) ->
+    % We're processing the first bit of data
+    % after we did a seek for a range fold.
+    foldl({Fd, Rest}, Fun, Fun(Bin, Acc));
+
+foldl({Fd, [Pos | Rest]}, Fun, Acc) when is_integer(Pos) ->
+    {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+    foldl({Fd, Rest}, Fun, Fun(Bin, Acc)).
+
+
+seek({Fd, [{Pos, Length} | Rest]}, Offset) ->
+    case Length =< Offset of
+        true ->
+            seek({Fd, Rest}, Offset - Length);
+        false ->
+            seek({Fd, [Pos | Rest]}, Offset)
+    end;
+
+seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) ->
+    {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+    case iolist_size(Bin) =< Offset of
+        true ->
+            seek({Fd, Rest}, Offset - size(Bin));
+        false ->
+            <<_:Offset/binary, Tail/binary>> = Bin,
+            {ok, {Fd, [Tail | Rest]}}
+    end.
+
+
+write({Fd, Written}, Data) when is_pid(Fd) ->
+    {ok, Pos, _} = couch_file:append_binary(Fd, Data),
+    {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}.
+
+
+finalize({Fd, Written}) ->
+    {ok, {Fd, lists:reverse(Written)}}.
+
+
+to_disk_term({_Fd, Written}) ->
+    {ok, Written}.
+

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.