You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/05/24 02:59:02 UTC

[couchdb] branch prototype/rfc-001-revision-metadata-model updated (bb3ed4c -> 819c9e6)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch prototype/rfc-001-revision-metadata-model
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    from bb3ed4c  Implement get_missing_revs
     new a11ac08  Copy couch_changes to chttpd_changes
     new 819c9e6  BLARGH YE MATEY

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/chttpd/src/chttpd.erl                          |  17 +-
 .../src/chttpd_changes.erl}                        | 331 ++++++-----
 src/chttpd/src/chttpd_db.erl                       | 148 +++--
 src/chttpd/src/chttpd_external.erl                 |  35 +-
 src/couch/src/couch_att.erl                        | 658 ++++++++-------------
 src/couch/src/couch_doc.erl                        |  11 +
 .../src/couch_replicator_api_wrap.erl              |   7 +-
 .../src/couch_replicator_changes_reader.erl        |   1 +
 .../src/couch_replicator_scheduler_job.erl         |   1 +
 src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl     |   2 +-
 src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl |   2 +-
 src/fabric/src/fabric2.hrl                         |   4 +
 src/fabric/src/fabric2_db.erl                      | 133 ++++-
 src/fabric/src/fabric2_events.erl                  |  84 +++
 src/fabric/src/fabric2_fdb.erl                     | 130 ++--
 src/fabric/src/fabric2_util.erl                    |   5 -
 src/fabric/test/fabric2_doc_crud_tests.erl         |  15 +
 17 files changed, 895 insertions(+), 689 deletions(-)
 copy src/{couch/src/couch_changes.erl => chttpd/src/chttpd_changes.erl} (78%)
 create mode 100644 src/fabric/src/fabric2_events.erl


[couchdb] 01/02: Copy couch_changes to chttpd_changes

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/rfc-001-revision-metadata-model
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a11ac081bc97cd1e44138a32feb38088d31f18e4
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Mon May 20 13:28:21 2019 -0500

    Copy couch_changes to chttpd_changes
---
 src/chttpd/src/chttpd_changes.erl | 919 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 919 insertions(+)

diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
new file mode 100644
index 0000000..2fe824c
--- /dev/null
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -0,0 +1,919 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(chttpd_changes).
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+    handle_db_changes/3,
+    handle_changes/4,
+    get_changes_timeout/2,
+    wait_updated/3,
+    get_rest_updated/1,
+    configure_filter/4,
+    filter/3,
+    handle_db_event/3,
+    handle_view_event/3,
+    view_filter/3,
+    send_changes_doc_ids/6,
+    send_changes_design_docs/6
+]).
+
+-export([changes_enumerator/2]).
+
+%% export so we can use fully qualified call to facilitate hot-code upgrade
+-export([
+    keep_sending_changes/3
+]).
+
+-record(changes_acc, {
+    db,
+    view_name,
+    ddoc_name,
+    view,
+    seq,
+    prepend,
+    filter,
+    callback,
+    user_acc,
+    resp_type,
+    limit,
+    include_docs,
+    doc_options,
+    conflicts,
+    timeout,
+    timeout_fun,
+    aggregation_kvs,
+    aggregation_results
+}).
+
+handle_db_changes(Args, Req, Db) ->
+    handle_changes(Args, Req, Db, db).
+
+handle_changes(Args1, Req, Db0, Type) ->
+    #changes_args{
+        style = Style,
+        filter = FilterName,
+        feed = Feed,
+        dir = Dir,
+        since = Since
+    } = Args1,
+    Filter = configure_filter(FilterName, Style, Req, Db0),
+    Args = Args1#changes_args{filter_fun = Filter},
+    % The type of changes feed depends on the supplied filter. If the query is
+    % for an optimized view-filtered db changes, we need to use the view
+    % sequence tree.
+    {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of
+        {{view, DDocName0, ViewName0}, _} ->
+            {true, DDocName0, ViewName0};
+        {_, {fast_view, _, DDoc, ViewName0}} ->
+            {true, DDoc#doc.id, ViewName0};
+        _ ->
+            {false, undefined, undefined}
+    end,
+    DbName = couch_db:name(Db0),
+    {StartListenerFun, View} = if UseViewChanges ->
+        {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
+                DbName, DDocName, ViewName, #mrargs{}),
+        case View0#mrview.seq_btree of
+            #btree{} ->
+                ok;
+            _ ->
+                throw({bad_request, "view changes not enabled"})
+        end,
+        SNFun = fun() ->
+            couch_event:link_listener(
+                 ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
+            )
+        end,
+        {SNFun, View0};
+    true ->
+        SNFun = fun() ->
+            couch_event:link_listener(
+                 ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+            )
+        end,
+        {SNFun, undefined}
+    end,
+    Start = fun() ->
+        {ok, Db} = couch_db:reopen(Db0),
+        StartSeq = case Dir of
+        rev ->
+            couch_db:get_update_seq(Db);
+        fwd ->
+            Since
+        end,
+        View2 = if UseViewChanges ->
+            {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
+                    DbName, DDocName, ViewName, #mrargs{}),
+            View1;
+        true ->
+            undefined
+        end,
+        {Db, View2, StartSeq}
+    end,
+    % begin timer to deal with heartbeat when filter function fails
+    case Args#changes_args.heartbeat of
+    undefined ->
+        erlang:erase(last_changes_heartbeat);
+    Val when is_integer(Val); Val =:= true ->
+        put(last_changes_heartbeat, os:timestamp())
+    end,
+
+    case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
+    true ->
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+            {ok, Listener} = StartListenerFun(),
+
+            {Db, View, StartSeq} = Start(),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
+                             <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
+                             View),
+            try
+                keep_sending_changes(
+                    Args#changes_args{dir=fwd},
+                    Acc0,
+                    true)
+            after
+                couch_event:stop_listener(Listener),
+                get_rest_updated(ok) % clean out any remaining update messages
+            end
+        end;
+    false ->
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            {Db, View, StartSeq} = Start(),
+            Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
+                             UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun,
+                             DDocName, ViewName, View),
+            {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
+                send_changes(
+                    Acc0,
+                    Dir,
+                    true),
+            end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
+        end
+    end.
+
+
+handle_db_event(_DbName, updated, Parent) ->
+    Parent ! updated,
+    {ok, Parent};
+handle_db_event(_DbName, deleted, Parent) ->
+    Parent ! deleted,
+    {ok, Parent};
+handle_db_event(_DbName, _Event, Parent) ->
+    {ok, Parent}.
+
+
+handle_view_event(_DbName, Msg, {Parent, DDocId}) ->
+    case Msg of
+        {index_commit, DDocId} ->
+            Parent ! updated;
+        {index_delete, DDocId} ->
+            Parent ! deleted;
+        _ ->
+            ok
+    end,
+    {ok, {Parent, DDocId}}.
+
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+    Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+    {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
+
+configure_filter("_doc_ids", Style, Req, _Db) ->
+    {doc_ids, Style, get_doc_ids(Req)};
+configure_filter("_selector", Style, Req, _Db) ->
+    {selector, Style,  get_selector_and_fields(Req)};
+configure_filter("_design", Style, _Req, _Db) ->
+    {design_docs, Style};
+configure_filter("_view", Style, Req, Db) ->
+    ViewName = get_view_qs(Req),
+    if ViewName /= "" -> ok; true ->
+        throw({bad_request, "`view` filter parameter is not provided."})
+    end,
+    ViewNameParts = string:tokens(ViewName, "/"),
+    case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
+        [DName, VName] ->
+            {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+            check_member_exists(DDoc, [<<"views">>, VName]),
+            FilterType = try
+                true = couch_util:get_nested_json_value(
+                        DDoc#doc.body,
+                        [<<"options">>, <<"seq_indexed">>]
+                ),
+                fast_view
+            catch _:_ ->
+                view
+            end,
+            case couch_db:is_clustered(Db) of
+                true ->
+                    DIR = fabric_util:doc_id_and_rev(DDoc),
+                    {fetch, FilterType, Style, DIR, VName};
+                false ->
+                    {FilterType, Style, DDoc, VName}
+            end;
+        [] ->
+            Msg = "`view` must be of the form `designname/viewname`",
+            throw({bad_request, Msg})
+    end;
+configure_filter([$_ | _], _Style, _Req, _Db) ->
+    throw({bad_request, "unknown builtin filter name"});
+configure_filter("", main_only, _Req, _Db) ->
+    {default, main_only};
+configure_filter("", all_docs, _Req, _Db) ->
+    {default, all_docs};
+configure_filter(FilterName, Style, Req, Db) ->
+    FilterNameParts = string:tokens(FilterName, "/"),
+    case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of
+        [DName, FName] ->
+            {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+            check_member_exists(DDoc, [<<"filters">>, FName]),
+            DIR = fabric_util:doc_id_and_rev(DDoc),
+            {fetch, custom, Style, Req, DIR, FName};
+        [] ->
+            {default, Style};
+        _Else ->
+            Msg = "`filter` must be of the form `designname/filtername`",
+            throw({bad_request, Msg})
+    end.
+
+
+filter(Db, #full_doc_info{}=FDI, Filter) ->
+    filter(Db, couch_doc:to_doc_info(FDI), Filter);
+filter(_Db, DocInfo, {default, Style}) ->
+    apply_style(DocInfo, Style);
+filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
+    case lists:member(DocInfo#doc_info.id, DocIds) of
+        true ->
+            apply_style(DocInfo, Style);
+        false ->
+            []
+    end;
+filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) ->
+    Docs = open_revs(Db, DocInfo, Style),
+    Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
+        || Doc <- Docs],
+    filter_revs(Passes, Docs);
+filter(_Db, DocInfo, {design_docs, Style}) ->
+    case DocInfo#doc_info.id of
+        <<"_design", _/binary>> ->
+            apply_style(DocInfo, Style);
+        _ ->
+            []
+    end;
+filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
+        when FilterType == view; FilterType == fast_view ->
+    Docs = open_revs(Db, DocInfo, Style),
+    {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+    filter_revs(Passes, Docs);
+filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
+    Req = case Req0 of
+        {json_req, _} -> Req0;
+        #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
+    end,
+    Docs = open_revs(Db, DocInfo, Style),
+    {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
+    filter_revs(Passes, Docs).
+
+fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) ->
+    case couch_db:get_doc_info(Db, ID) of
+        {ok, #doc_info{high_seq=Seq}=DocInfo} ->
+            Docs = open_revs(Db, DocInfo, Style),
+            Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
+                RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+                {[{<<"rev">>, RevStr}]}
+            end, Docs),
+            {DocInfo, Changes};
+        {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq ->
+            % If the view seq tree is out of date (or if the view seq tree
+            % was opened before the db) seqs may come by from the seq tree
+            % which correspond to the not-most-current revision of a document.
+            % The proper thing to do is to not send this old revision, but wait
+            % until we reopen the up-to-date view seq tree and continue the
+            % fold.
+            % I left the Seq > HighSeq guard in so if (for some godforsaken
+            % reason) the seq in the view is more current than the database,
+            % we'll throw an error.
+            {undefined, []};
+        {error, not_found} ->
+            {undefined, []}
+    end.
+
+
+
+view_filter(Db, KV, {default, Style}) ->
+    apply_view_style(Db, KV, Style).
+
+
+get_view_qs({json_req, {Props}}) ->
+    {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
+    binary_to_list(couch_util:get_value(<<"view">>, Query, ""));
+get_view_qs(Req) ->
+    couch_httpd:qs_value(Req, "view", "").
+
+get_doc_ids({json_req, {Props}}) ->
+    check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='POST'}=Req) ->
+    couch_httpd:validate_ctype(Req, "application/json"),
+    {Props} = couch_httpd:json_body_obj(Req),
+    check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='GET'}=Req) ->
+    DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
+    check_docids(DocIds);
+get_doc_ids(_) ->
+    throw({bad_request, no_doc_ids_provided}).
+
+
+get_selector_and_fields({json_req, {Props}}) ->
+    Selector = check_selector(couch_util:get_value(<<"selector">>, Props)),
+    Fields = check_fields(couch_util:get_value(<<"fields">>, Props, nil)),
+    {Selector, Fields};
+get_selector_and_fields(#httpd{method='POST'}=Req) ->
+    couch_httpd:validate_ctype(Req, "application/json"),
+    get_selector_and_fields({json_req,  couch_httpd:json_body_obj(Req)});
+get_selector_and_fields(_) ->
+    throw({bad_request, "Selector must be specified in POST payload"}).
+
+
+check_docids(DocIds) when is_list(DocIds) ->
+    lists:foreach(fun
+        (DocId) when not is_binary(DocId) ->
+            Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+            throw({bad_request, Msg});
+        (_) -> ok
+    end, DocIds),
+    DocIds;
+check_docids(_) ->
+    Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+    throw({bad_request, Msg}).
+
+
+check_selector(Selector={_}) ->
+    try
+        mango_selector:normalize(Selector)
+    catch
+        {mango_error, Mod, Reason0} ->
+            {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+            throw({bad_request, Reason})
+    end;
+check_selector(_Selector) ->
+    throw({bad_request, "Selector error: expected a JSON object"}).
+
+
+check_fields(nil) ->
+    nil;
+check_fields(Fields) when is_list(Fields) ->
+    try
+        {ok, Fields1} = mango_fields:new(Fields),
+        Fields1
+    catch
+        {mango_error, Mod, Reason0} ->
+            {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+            throw({bad_request, Reason})
+    end;
+check_fields(_Fields) ->
+    throw({bad_request, "Selector error: fields must be JSON array"}).
+
+
+open_ddoc(Db, DDocId) ->
+    case ddoc_cache:open_doc(Db, DDocId) of
+        {ok, _} = Resp -> Resp;
+        Else -> throw(Else)
+    end.
+
+
+check_member_exists(#doc{body={Props}}, Path) ->
+    couch_util:get_nested_json_value({Props}, Path).
+
+
+apply_style(#doc_info{revs=Revs}, main_only) ->
+    [#rev_info{rev=Rev} | _] = Revs,
+    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_style(#doc_info{revs=Revs}, all_docs) ->
+    [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
+
+apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) ->
+    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) ->
+    case couch_db:get_doc_info(Db, ID) of
+        {ok, DocInfo} ->
+            apply_style(DocInfo, all_docs);
+        {error, not_found} ->
+            []
+    end.
+
+
+open_revs(Db, DocInfo, Style) ->
+    DocInfos = case Style of
+        main_only -> [DocInfo];
+        all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
+    end,
+    OpenOpts = [deleted, conflicts],
+    % Relying on list comprehensions to silence errors
+    OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
+    [Doc || {ok, Doc} <- OpenResults].
+
+
+filter_revs(Passes, Docs) ->
+    lists:flatmap(fun
+        ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
+            RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+            Change = {[{<<"rev">>, RevStr}]},
+            [Change];
+        (_) ->
+            []
+    end, lists:zip(Passes, Docs)).
+
+
+get_changes_timeout(Args, Callback) ->
+    #changes_args{
+        heartbeat = Heartbeat,
+        timeout = Timeout,
+        feed = ResponseType
+    } = Args,
+    DefaultTimeout = list_to_integer(
+        config:get("httpd", "changes_timeout", "60000")
+    ),
+    case Heartbeat of
+    undefined ->
+        case Timeout of
+        undefined ->
+            {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
+        infinity ->
+            {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+        _ ->
+            {lists:min([DefaultTimeout, Timeout]),
+                fun(UserAcc) -> {stop, UserAcc} end}
+        end;
+    true ->
+        {DefaultTimeout,
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
+    _ ->
+        {lists:min([DefaultTimeout, Heartbeat]),
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
+    end.
+
+start_sending_changes(_Callback, UserAcc, ResponseType)
+        when ResponseType =:= "continuous"
+        orelse ResponseType =:= "eventsource" ->
+    UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+    Callback(start, ResponseType, UserAcc).
+
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
+    #changes_args{
+        include_docs = IncludeDocs,
+        doc_options = DocOpts,
+        conflicts = Conflicts,
+        limit = Limit,
+        feed = ResponseType,
+        filter_fun = Filter
+    } = Args,
+    #changes_acc{
+        db = Db,
+        seq = StartSeq,
+        prepend = Prepend,
+        filter = Filter,
+        callback = Callback,
+        user_acc = UserAcc,
+        resp_type = ResponseType,
+        limit = Limit,
+        include_docs = IncludeDocs,
+        doc_options = DocOpts,
+        conflicts = Conflicts,
+        timeout = Timeout,
+        timeout_fun = TimeoutFun,
+        ddoc_name = DDocName,
+        view_name = ViewName,
+        view = View,
+        aggregation_results=[],
+        aggregation_kvs=[]
+    }.
+
+send_changes(Acc, Dir, FirstRound) ->
+    #changes_acc{
+        db = Db,
+        seq = StartSeq,
+        filter = Filter,
+        view = View
+    } = Acc,
+    DbEnumFun = fun changes_enumerator/2,
+    case can_optimize(FirstRound, Filter) of
+        {true, Fun} ->
+            Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
+        _ ->
+            case {View, Filter}  of
+                {#mrview{}, {fast_view, _, _, _}} ->
+                    couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+                {undefined, _} ->
+                    Opts = [{dir, Dir}],
+                    couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
+                {#mrview{}, _} ->
+                    ViewEnumFun = fun view_changes_enumerator/2,
+                    {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
+                    case Acc0 of
+                        #changes_acc{aggregation_results=[]} ->
+                            {Go, Acc0};
+                        _ ->
+                            #changes_acc{
+                                aggregation_results = AggResults,
+                                aggregation_kvs = AggKVs,
+                                user_acc = UserAcc,
+                                callback = Callback,
+                                resp_type = ResponseType,
+                                prepend = Prepend
+                            } = Acc0,
+                            ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
+                            UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+                            reset_heartbeat(),
+                            {Go, Acc0#changes_acc{user_acc=UserAcc0}}
+                    end
+            end
+    end.
+
+
+can_optimize(true, {doc_ids, _Style, DocIds}) ->
+    MaxDocIds = config:get_integer("couchdb",
+        "changes_doc_ids_optimization_threshold", 100),
+    if length(DocIds) =< MaxDocIds ->
+        {true, fun send_changes_doc_ids/6};
+    true ->
+        false
+    end;
+can_optimize(true, {design_docs, _Style}) ->
+    {true, fun send_changes_design_docs/6};
+can_optimize(_, _) ->
+    false.
+
+
+send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
+    Results = couch_db:get_full_doc_infos(Db, DocIds),
+    FullInfos = lists:foldl(fun
+        (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
+        (not_found, Acc) -> Acc
+    end, [], Results),
+    send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
+    FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+    Opts = [
+        include_deleted,
+        {start_key, <<"_design/">>},
+        {end_key_gt, <<"_design0">>}
+    ],
+    {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts),
+    send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
+    FoldFun = case Dir of
+        fwd -> fun lists:foldl/3;
+        rev -> fun lists:foldr/3
+    end,
+    GreaterFun = case Dir of
+        fwd -> fun(A, B) -> A > B end;
+        rev -> fun(A, B) -> A =< B end
+    end,
+    DocInfos = lists:foldl(fun(FDI, Acc) ->
+        DI = couch_doc:to_doc_info(FDI),
+        case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+            true -> [DI | Acc];
+            false -> Acc
+        end
+    end, [], FullDocInfos),
+    SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+    FinalAcc = try
+        FoldFun(fun(DocInfo, Acc) ->
+            case Fun(DocInfo, Acc) of
+                {ok, NewAcc} ->
+                    NewAcc;
+                {stop, NewAcc} ->
+                    throw({stop, NewAcc})
+            end
+        end, Acc0, SortedDocInfos)
+    catch
+        {stop, Acc} -> Acc
+    end,
+    case Dir of
+        fwd ->
+            FinalAcc0 = case element(1, FinalAcc) of
+                changes_acc -> % we came here via couch_http or internal call
+                    FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)};
+                fabric_changes_acc -> % we came here via chttpd / fabric / rexi
+                    FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
+            end,
+            {ok, FinalAcc0};
+        rev -> {ok, FinalAcc}
+    end.
+
+
+keep_sending_changes(Args, Acc0, FirstRound) ->
+    #changes_args{
+        feed = ResponseType,
+        limit = Limit,
+        db_open_options = DbOptions
+    } = Args,
+
+    {ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound),
+
+    #changes_acc{
+        db = Db, callback = Callback,
+        timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
+        prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
+        ddoc_name = DDocName, view_name = ViewName
+    } = ChangesAcc,
+
+    couch_db:close(Db),
+    if Limit > NewLimit, ResponseType == "longpoll" ->
+        end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+    true ->
+        case wait_updated(Timeout, TimeoutFun, UserAcc2) of
+        {updated, UserAcc4} ->
+            DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
+            case couch_db:open(couch_db:name(Db), DbOptions1) of
+            {ok, Db2} ->
+                ?MODULE:keep_sending_changes(
+                  Args#changes_args{limit=NewLimit},
+                  ChangesAcc#changes_acc{
+                    db = Db2,
+                    view = maybe_refresh_view(Db2, DDocName, ViewName),
+                    user_acc = UserAcc4,
+                    seq = EndSeq,
+                    prepend = Prepend2,
+                    timeout = Timeout,
+                    timeout_fun = TimeoutFun},
+                  false);
+            _Else ->
+                end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
+            end;
+        {stop, UserAcc4} ->
+            end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+        end
+    end.
+
+maybe_refresh_view(_, undefined, undefined) ->
+    undefined;
+maybe_refresh_view(Db, DDocName, ViewName) ->
+    DbName = couch_db:name(Db),
+    {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
+    View.
+
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+    Callback({stop, EndSeq}, ResponseType, UserAcc).
+
+view_changes_enumerator(Value, Acc) ->
+    #changes_acc{
+        filter = Filter, callback = Callback, prepend = Prepend,
+        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
+        aggregation_kvs=AggKVs, aggregation_results=AggResults
+    } = Acc,
+
+    Results0 = view_filter(Db, Value, Filter),
+    Results = [Result || Result <- Results0, Result /= null],
+    {{Seq, _}, _} = Value,
+
+    Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+
+    if CurrentSeq =:= Seq ->
+        NewAggKVs = case Results of
+            [] -> AggKVs;
+            _ -> [Value|AggKVs]
+        end,
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        Acc0 = Acc#changes_acc{
+            seq = Seq,
+            user_acc = UserAcc2,
+            aggregation_kvs=NewAggKVs
+        },
+        case Done of
+            stop -> {stop, Acc0};
+            ok -> {Go, Acc0}
+        end;
+    AggResults =/= [] ->
+        {NewAggKVs, NewAggResults} = case Results of
+            [] -> {[], []};
+            _ -> {[Value], Results}
+        end,
+        if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+            ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+            UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{
+                seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
+                aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
+        true ->
+            ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+            UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{
+                seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
+                limit = Limit - 1, aggregation_kvs=[Value],
+                aggregation_results=Results}}
+        end;
+    true ->
+        {NewAggKVs, NewAggResults} = case Results of
+            [] -> {[], []};
+            _ -> {[Value], Results}
+        end,
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        Acc0 = Acc#changes_acc{
+            seq = Seq,
+            user_acc = UserAcc2,
+            aggregation_kvs=NewAggKVs,
+            aggregation_results=NewAggResults
+        },
+        case Done of
+            stop -> {stop, Acc0};
+            ok -> {Go, Acc0}
+        end
+    end.
+
+changes_enumerator(Value0, Acc) ->
+    #changes_acc{
+        filter = Filter, callback = Callback, prepend = Prepend,
+        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun
+    } = Acc,
+    {Value, Results0} = case Filter of
+        {fast_view, _, _, _} ->
+            fast_view_filter(Db, Value0, Filter);
+        _ ->
+            {Value0, filter(Db, Value0, Filter)}
+    end,
+    Results = [Result || Result <- Results0, Result /= null],
+    Seq = case Value of
+        #full_doc_info{} ->
+            Value#full_doc_info.update_seq;
+        #doc_info{} ->
+            Value#doc_info.high_seq;
+        {{Seq0, _}, _} ->
+            Seq0
+    end,
+    Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+    case Results of
+    [] ->
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        case Done of
+        stop ->
+            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+        ok ->
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+        end;
+    _ ->
+        if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+            ChangesRow = changes_row(Results, Value, Acc),
+            UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
+        true ->
+            ChangesRow = changes_row(Results, Value, Acc),
+            UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{
+                seq = Seq, prepend = <<",\n">>,
+                user_acc = UserAcc2, limit = Limit - 1}}
+        end
+    end.
+
+
+
+view_changes_row(Results, KVs, Acc) ->
+    {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
+        {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
+        case Value of
+            removed ->
+                {AddAcc, [Key|RemAcc]};
+            {dups, DupValues} ->
+                AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
+                    [[Key, DupValue]|AddAcc0]
+                end, AddAcc, DupValues),
+                {AddAcc1, RemAcc};
+            _ ->
+                {[[Key, Value]|AddAcc], RemAcc}
+        end
+    end, {[], []}, KVs),
+
+    % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
+    % by seq.
+    [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
+
+    {[
+        {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
+        {<<"remove">>, Remove}, {<<"changes">>, Results}
+    ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+
+
+changes_row(Results, #full_doc_info{} = FDI, Acc) ->
+    changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
+changes_row(Results, DocInfo, Acc) ->
+    #doc_info{
+        id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
+    } = DocInfo,
+    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+        deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}.
+
+maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
+    #changes_acc{
+        db = Db,
+        doc_options = DocOpts,
+        conflicts = Conflicts,
+        filter = Filter
+    } = Acc,
+    Opts = case Conflicts of
+               true -> [deleted, conflicts];
+               false -> [deleted]
+           end,
+    load_doc(Db, Value, Opts, DocOpts, Filter);
+
+maybe_get_changes_doc(_Value, _Acc) ->
+    [].
+
+
+load_doc(Db, Value, Opts, DocOpts, Filter) ->
+    case couch_index_util:load_doc(Db, Value, Opts) of
+        null ->
+            [{doc, null}];
+        Doc ->
+            [{doc, doc_to_json(Doc, DocOpts, Filter)}]
+    end.
+
+
+doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}})
+    when Fields =/= nil ->
+    mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields);
+doc_to_json(Doc, DocOpts, _Filter) ->
+    couch_doc:to_json_obj(Doc, DocOpts).
+
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].
+
+% waits for a updated msg, if there are multiple msgs, collects them.
+wait_updated(Timeout, TimeoutFun, UserAcc) ->
+    receive
+    updated ->
+        get_rest_updated(UserAcc);
+    deleted ->
+        {stop, UserAcc}
+    after Timeout ->
+        {Go, UserAcc2} = TimeoutFun(UserAcc),
+        case Go of
+        ok ->
+            ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
+        stop ->
+            {stop, UserAcc2}
+        end
+    end.
+
+get_rest_updated(UserAcc) ->
+    receive
+    updated ->
+        get_rest_updated(UserAcc)
+    after 0 ->
+        {updated, UserAcc}
+    end.
+
+reset_heartbeat() ->
+    case get(last_changes_heartbeat) of
+    undefined ->
+        ok;
+    _ ->
+        put(last_changes_heartbeat, os:timestamp())
+    end.
+
+maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
+    Before = get(last_changes_heartbeat),
+    case Before of
+    undefined ->
+        {ok, Acc};
+    _ ->
+        Now = os:timestamp(),
+        case timer:now_diff(Now, Before) div 1000 >= Timeout of
+        true ->
+            Acc2 = TimeoutFun(Acc),
+            put(last_changes_heartbeat, Now),
+            Acc2;
+        false ->
+            {ok, Acc}
+        end
+    end.


[couchdb] 02/02: BLARGH YE MATEY

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/rfc-001-revision-metadata-model
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 819c9e6037e064a9e7d53b2f57bc86694374e6ce
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu May 23 21:58:28 2019 -0500

    BLARGH YE MATEY
    
    Storm rolling through. Pushing to the cloud in case my laptop gets
    fried.
---
 src/chttpd/src/chttpd.erl                          |  17 +-
 src/chttpd/src/chttpd_changes.erl                  | 308 ++++++----
 src/chttpd/src/chttpd_db.erl                       | 148 +++--
 src/chttpd/src/chttpd_external.erl                 |  35 +-
 src/couch/src/couch_att.erl                        | 658 ++++++++-------------
 src/couch/src/couch_doc.erl                        |  11 +
 .../src/couch_replicator_api_wrap.erl              |   7 +-
 .../src/couch_replicator_changes_reader.erl        |   1 +
 .../src/couch_replicator_scheduler_job.erl         |   1 +
 src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl     |   2 +-
 src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl |   2 +-
 src/fabric/src/fabric2.hrl                         |   4 +
 src/fabric/src/fabric2_db.erl                      | 133 ++++-
 src/fabric/src/fabric2_events.erl                  |  84 +++
 src/fabric/src/fabric2_fdb.erl                     | 130 ++--
 src/fabric/src/fabric2_util.erl                    |   5 -
 src/fabric/test/fabric2_doc_crud_tests.erl         |  15 +
 17 files changed, 891 insertions(+), 670 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 631eb77..c548e18 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -25,7 +25,7 @@
     error_info/1, parse_form/1, json_body/1, json_body_obj/1, body/1,
     doc_etag/1, make_etag/1, etag_respond/3, etag_match/2,
     partition/1, serve_file/3, serve_file/4,
-    server_header/0, start_chunked_response/3,send_chunk/2,
+    server_header/0, start_chunked_response/3,send_chunk/2,last_chunk/1,
     start_response_length/4, send/2, start_json_response/2,
     start_json_response/3, end_json_response/1, send_response/4,
     send_response_no_cors/4,
@@ -381,7 +381,7 @@ update_stats(#httpd{begin_ts = BeginTime}, #httpd_resp{} = Res) ->
     end,
     Res.
 
-maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = true} = HttpResp) ->
+maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = _} = HttpResp) ->
     #httpd{
         mochi_req = MochiReq,
         begin_ts = BeginTime,
@@ -397,9 +397,9 @@ maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = true} = HttpResp) ->
     Host = MochiReq:get_header_value("Host"),
     RawUri = MochiReq:get(raw_path),
     RequestTime = timer:now_diff(EndTime, BeginTime) / 1000,
-    couch_log:notice("~s ~s ~s ~s ~s ~B ~p ~B", [Host, Peer, User,
+    couch_log:error("~s ~s ~s ~s ~s ~B ~p ~B", [Host, Peer, User,
         Method, RawUri, Code, Status, round(RequestTime)]);
-maybe_log(_HttpReq, #httpd_resp{should_log = false}) ->
+maybe_log(_HttpReq, _) ->
     ok.
 
 
@@ -737,7 +737,14 @@ start_chunked_response(#httpd{mochi_req=MochiReq}=Req, Code, Headers0) ->
     {ok, Resp}.
 
 send_chunk(Resp, Data) ->
-    Resp:write_chunk(Data),
+    case iolist_size(Data) of
+        0 -> ok; % do nothing
+        _ -> Resp:write_chunk(Data)
+    end,
+    {ok, Resp}.
+
+last_chunk(Resp) ->
+    Resp:write_chunk([]),
     {ok, Resp}.
 
 send_response(Req, Code, Headers0, Body) ->
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index 2fe824c..30caab2 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -60,7 +60,8 @@
 handle_db_changes(Args, Req, Db) ->
     handle_changes(Args, Req, Db, db).
 
-handle_changes(Args1, Req, Db0, Type) ->
+handle_changes(Args1, Req, Db, Type) ->
+    ReqPid = chttpd:header_value(Req, "XKCD", "<unknown>"),
     #changes_args{
         style = Style,
         filter = FilterName,
@@ -68,7 +69,8 @@ handle_changes(Args1, Req, Db0, Type) ->
         dir = Dir,
         since = Since
     } = Args1,
-    Filter = configure_filter(FilterName, Style, Req, Db0),
+    couch_log:error("XKCD: STARTING CHANGES FEED ~p for ~s : ~p", [self(), ReqPid, Since]),
+    Filter = configure_filter(FilterName, Style, Req, Db),
     Args = Args1#changes_args{filter_fun = Filter},
     % The type of changes feed depends on the supplied filter. If the query is
     % for an optimized view-filtered db changes, we need to use the view
@@ -81,7 +83,7 @@ handle_changes(Args1, Req, Db0, Type) ->
         _ ->
             {false, undefined, undefined}
     end,
-    DbName = couch_db:name(Db0),
+    DbName = fabric2_db:name(Db),
     {StartListenerFun, View} = if UseViewChanges ->
         {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
                 DbName, DDocName, ViewName, #mrargs{}),
@@ -99,17 +101,16 @@ handle_changes(Args1, Req, Db0, Type) ->
         {SNFun, View0};
     true ->
         SNFun = fun() ->
-            couch_event:link_listener(
-                 ?MODULE, handle_db_event, self(), [{dbname, DbName}]
-            )
+            fabric2_events:link_listener(
+                    ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+                )
         end,
         {SNFun, undefined}
     end,
     Start = fun() ->
-        {ok, Db} = couch_db:reopen(Db0),
         StartSeq = case Dir of
         rev ->
-            couch_db:get_update_seq(Db);
+            fabric2_fdb:get_update_seq(Db);
         fwd ->
             Since
         end,
@@ -137,7 +138,7 @@ handle_changes(Args1, Req, Db0, Type) ->
             {ok, Listener} = StartListenerFun(),
 
             {Db, View, StartSeq} = Start(),
-            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            UserAcc2 = start_sending_changes(Callback, UserAcc),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
             Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
                              <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
@@ -148,14 +149,14 @@ handle_changes(Args1, Req, Db0, Type) ->
                     Acc0,
                     true)
             after
-                couch_event:stop_listener(Listener),
+                fabric2_events:stop_listener(Listener),
                 get_rest_updated(ok) % clean out any remaining update messages
             end
         end;
     false ->
         fun(CallbackAcc) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
-            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            UserAcc2 = start_sending_changes(Callback, UserAcc),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
             {Db, View, StartSeq} = Start(),
             Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
@@ -166,7 +167,7 @@ handle_changes(Args1, Req, Db0, Type) ->
                     Acc0,
                     Dir,
                     true),
-            end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
+            end_sending_changes(Callback, UserAcc3, LastSeq)
         end
     end.
 
@@ -192,10 +193,10 @@ handle_view_event(_DbName, Msg, {Parent, DDocId}) ->
     end,
     {ok, {Parent, DDocId}}.
 
-get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 2) ->
     Pair;
-get_callback_acc(Callback) when is_function(Callback, 2) ->
-    {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+get_callback_acc(Callback) when is_function(Callback, 1) ->
+    {fun(Ev, _) -> Callback(Ev) end, ok}.
 
 
 configure_filter("_doc_ids", Style, Req, _Db) ->
@@ -223,7 +224,7 @@ configure_filter("_view", Style, Req, Db) ->
             catch _:_ ->
                 view
             end,
-            case couch_db:is_clustered(Db) of
+            case fabric2_db:is_clustered(Db) of
                 true ->
                     DIR = fabric_util:doc_id_and_rev(DDoc),
                     {fetch, FilterType, Style, DIR, VName};
@@ -246,8 +247,7 @@ configure_filter(FilterName, Style, Req, Db) ->
         [DName, FName] ->
             {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
             check_member_exists(DDoc, [<<"filters">>, FName]),
-            DIR = fabric_util:doc_id_and_rev(DDoc),
-            {fetch, custom, Style, Req, DIR, FName};
+            {custom, Style, Req, DDoc, FName};
         [] ->
             {default, Style};
         _Else ->
@@ -256,45 +256,45 @@ configure_filter(FilterName, Style, Req, Db) ->
     end.
 
 
-filter(Db, #full_doc_info{}=FDI, Filter) ->
-    filter(Db, couch_doc:to_doc_info(FDI), Filter);
-filter(_Db, DocInfo, {default, Style}) ->
-    apply_style(DocInfo, Style);
-filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
-    case lists:member(DocInfo#doc_info.id, DocIds) of
+filter(Db, Change, {default, Style}) ->
+    apply_style(Db, Change, Style);
+filter(Db, Change, {doc_ids, Style, DocIds}) ->
+    case lists:member(maps:get(id, Change), DocIds) of
         true ->
-            apply_style(DocInfo, Style);
+            apply_style(Db, Change, Style);
         false ->
             []
     end;
-filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) ->
-    Docs = open_revs(Db, DocInfo, Style),
+filter(Db, Change, {selector, Style, {Selector, _Fields}}) ->
+    Docs = open_revs(Db, Change, Style),
     Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
         || Doc <- Docs],
     filter_revs(Passes, Docs);
-filter(_Db, DocInfo, {design_docs, Style}) ->
-    case DocInfo#doc_info.id of
+filter(Db, Change, {design_docs, Style}) ->
+    case maps:get(id, Change) of
         <<"_design", _/binary>> ->
-            apply_style(DocInfo, Style);
+            apply_style(Db, Change, Style);
         _ ->
             []
     end;
-filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
+filter(Db, Change, {FilterType, Style, DDoc, VName})
         when FilterType == view; FilterType == fast_view ->
-    Docs = open_revs(Db, DocInfo, Style),
+    Docs = open_revs(Db, Change, Style),
     {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
     filter_revs(Passes, Docs);
-filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
+filter(Db, Change, {custom, Style, Req0, DDoc, FName}) ->
     Req = case Req0 of
         {json_req, _} -> Req0;
-        #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
+        #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
     end,
-    Docs = open_revs(Db, DocInfo, Style),
+    Docs = open_revs(Db, Change, Style),
     {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
-    filter_revs(Passes, Docs).
+    filter_revs(Passes, Docs);
+filter(A, B, C) ->
+    erlang:error({filter_error, A, B, C}).
 
 fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) ->
-    case couch_db:get_doc_info(Db, ID) of
+    case fabric2_db:get_doc_info(Db, ID) of
         {ok, #doc_info{high_seq=Seq}=DocInfo} ->
             Docs = open_revs(Db, DocInfo, Style),
             Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
@@ -404,32 +404,51 @@ check_member_exists(#doc{body={Props}}, Path) ->
     couch_util:get_nested_json_value({Props}, Path).
 
 
-apply_style(#doc_info{revs=Revs}, main_only) ->
-    [#rev_info{rev=Rev} | _] = Revs,
-    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
-apply_style(#doc_info{revs=Revs}, all_docs) ->
-    [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
+apply_style(_Db, Change, main_only) ->
+    #{rev_id := RevId} = Change,
+    [{[{<<"rev">>, couch_doc:rev_to_str(RevId)}]}];
+apply_style(Db, Change, all_docs) ->
+    % We have to fetch all revs for this row
+    #{id := DocId} = Change,
+    {ok, Resps} = fabric2_db:open_doc_revs(Db, DocId, all, [deleted]),
+    lists:flatmap(fun(Resp) ->
+        case Resp of
+            {ok, #doc{revs = {Pos, [Rev | _]}}} ->
+                [{[{<<"rev">>, couch_doc:rev_to_str({Pos, Rev})}]}];
+            _ ->
+                []
+        end
+    end, Resps);
+apply_style(A, B, C) ->
+    erlang:error({changes_apply_style, A, B, C}).
 
 apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) ->
     [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
 apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) ->
     case couch_db:get_doc_info(Db, ID) of
         {ok, DocInfo} ->
-            apply_style(DocInfo, all_docs);
+            apply_style(Db, DocInfo, all_docs);
         {error, not_found} ->
             []
     end.
 
 
-open_revs(Db, DocInfo, Style) ->
-    DocInfos = case Style of
-        main_only -> [DocInfo];
-        all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
-    end,
-    OpenOpts = [deleted, conflicts],
-    % Relying on list comprehensions to silence errors
-    OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
-    [Doc || {ok, Doc} <- OpenResults].
+open_revs(Db, Change, Style) ->
+    #{id := DocId} = Change,
+    Options = [deleted, conflicts],
+    try
+        case Style of
+            main_only ->
+                {ok, Doc} = fabric2_db:open_doc(Db, DocId, Options),
+                [Doc];
+            all_docs ->
+                {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, all, Options),
+                [Doc || {ok, Doc} <- Docs]
+        end
+    catch _:_ ->
+        % We didn't log this before, should we now?
+        []
+    end.
 
 
 filter_revs(Passes, Docs) ->
@@ -471,12 +490,9 @@ get_changes_timeout(Args, Callback) ->
             fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
     end.
 
-start_sending_changes(_Callback, UserAcc, ResponseType)
-        when ResponseType =:= "continuous"
-        orelse ResponseType =:= "eventsource" ->
-    UserAcc;
-start_sending_changes(Callback, UserAcc, ResponseType) ->
-    Callback(start, ResponseType, UserAcc).
+start_sending_changes(Callback, UserAcc) ->
+    {_, NewUserAcc} = Callback(start, UserAcc),
+    NewUserAcc.
 
 build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
     #changes_args{
@@ -525,7 +541,7 @@ send_changes(Acc, Dir, FirstRound) ->
                     couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
                 {undefined, _} ->
                     Opts = [{dir, Dir}],
-                    couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
+                    fabric2_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
                 {#mrview{}, _} ->
                     ViewEnumFun = fun view_changes_enumerator/2,
                     {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
@@ -565,7 +581,7 @@ can_optimize(_, _) ->
 
 
 send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
-    Results = couch_db:get_full_doc_infos(Db, DocIds),
+    Results = fabric2_db:get_full_doc_infos(Db, DocIds),
     FullInfos = lists:foldl(fun
         (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
         (not_found, Acc) -> Acc
@@ -603,7 +619,21 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
     SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
     FinalAcc = try
         FoldFun(fun(DocInfo, Acc) ->
-            case Fun(DocInfo, Acc) of
+            % Kinda gross that we're munging this back to a map
+            % that will then have to re-read and rebuild the FDI
+            % for all_docs style. But c'est la vie.
+            #doc_info{
+                id = DocId,
+                high_seq = Seq,
+                revs = [#rev_info{rev = Rev, deleted = Deleted} | _]
+            } = DocInfo,
+            Change = #{
+                id => DocId,
+                sequence => Seq,
+                rev_id => Rev,
+                deleted => Deleted
+            },
+            case Fun(Change, Acc) of
                 {ok, NewAcc} ->
                     NewAcc;
                 {stop, NewAcc} ->
@@ -617,7 +647,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
         fwd ->
             FinalAcc0 = case element(1, FinalAcc) of
                 changes_acc -> % we came here via couch_http or internal call
-                    FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)};
+                    FinalAcc#changes_acc{seq = fabric2_db:get_update_seq(Db)};
                 fabric_changes_acc -> % we came here via chttpd / fabric / rexi
                     FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
             end,
@@ -642,31 +672,34 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
         ddoc_name = DDocName, view_name = ViewName
     } = ChangesAcc,
 
-    couch_db:close(Db),
     if Limit > NewLimit, ResponseType == "longpoll" ->
-        end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+        end_sending_changes(Callback, UserAcc2, EndSeq);
     true ->
-        case wait_updated(Timeout, TimeoutFun, UserAcc2) of
-        {updated, UserAcc4} ->
-            DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
-            case couch_db:open(couch_db:name(Db), DbOptions1) of
-            {ok, Db2} ->
-                ?MODULE:keep_sending_changes(
-                  Args#changes_args{limit=NewLimit},
-                  ChangesAcc#changes_acc{
-                    db = Db2,
-                    view = maybe_refresh_view(Db2, DDocName, ViewName),
-                    user_acc = UserAcc4,
-                    seq = EndSeq,
-                    prepend = Prepend2,
-                    timeout = Timeout,
-                    timeout_fun = TimeoutFun},
-                  false);
-            _Else ->
-                end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
-            end;
-        {stop, UserAcc4} ->
-            end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+        {Go, UserAcc3} = notify_waiting_for_updates(Callback, UserAcc2),
+        if Go /= ok -> end_sending_changes(Callback, UserAcc3, EndSeq); true ->
+            case wait_updated(Timeout, TimeoutFun, UserAcc3) of
+            {updated, UserAcc4} ->
+                UserCtx = fabric2_db:get_user_ctx(Db),
+                DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
+                case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
+                {ok, Db2} ->
+                    ?MODULE:keep_sending_changes(
+                      Args#changes_args{limit=NewLimit},
+                      ChangesAcc#changes_acc{
+                        db = Db2,
+                        view = maybe_refresh_view(Db2, DDocName, ViewName),
+                        user_acc = UserAcc4,
+                        seq = EndSeq,
+                        prepend = Prepend2,
+                        timeout = Timeout,
+                        timeout_fun = TimeoutFun},
+                      false);
+                _Else ->
+                    end_sending_changes(Callback, UserAcc3, EndSeq)
+                end;
+            {stop, UserAcc4} ->
+                end_sending_changes(Callback, UserAcc4, EndSeq)
+            end
         end
     end.
 
@@ -677,8 +710,11 @@ maybe_refresh_view(Db, DDocName, ViewName) ->
     {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
     View.
 
-end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
-    Callback({stop, EndSeq}, ResponseType, UserAcc).
+notify_waiting_for_updates(Callback, UserAcc) ->
+    Callback(waiting_for_updates, UserAcc).
+
+end_sending_changes(Callback, UserAcc, EndSeq) ->
+    Callback({stop, EndSeq, null}, UserAcc).
 
 view_changes_enumerator(Value, Acc) ->
     #changes_acc{
@@ -748,27 +784,24 @@ view_changes_enumerator(Value, Acc) ->
         end
     end.
 
-changes_enumerator(Value0, Acc) ->
+changes_enumerator(Change0, Acc) ->
     #changes_acc{
-        filter = Filter, callback = Callback, prepend = Prepend,
-        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
-        timeout = Timeout, timeout_fun = TimeoutFun
+        filter = Filter,
+        callback = Callback,
+        user_acc = UserAcc,
+        limit = Limit,
+        db = Db,
+        timeout = Timeout,
+        timeout_fun = TimeoutFun
     } = Acc,
-    {Value, Results0} = case Filter of
+    {Change1, Results0} = case Filter of
         {fast_view, _, _, _} ->
-            fast_view_filter(Db, Value0, Filter);
+            fast_view_filter(Db, Change0, Filter);
         _ ->
-            {Value0, filter(Db, Value0, Filter)}
+            {Change0, filter(Db, Change0, Filter)}
     end,
     Results = [Result || Result <- Results0, Result /= null],
-    Seq = case Value of
-        #full_doc_info{} ->
-            Value#full_doc_info.update_seq;
-        #doc_info{} ->
-            Value#doc_info.high_seq;
-        {{Seq0, _}, _} ->
-            Seq0
-    end,
+    Seq = maps:get(sequence, Change1),
     Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
     case Results of
     [] ->
@@ -780,19 +813,19 @@ changes_enumerator(Value0, Acc) ->
             {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
         end;
     _ ->
-        if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
-            ChangesRow = changes_row(Results, Value, Acc),
-            UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
-            reset_heartbeat(),
-            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
-        true ->
-            ChangesRow = changes_row(Results, Value, Acc),
-            UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
-            reset_heartbeat(),
-            {Go, Acc#changes_acc{
-                seq = Seq, prepend = <<",\n">>,
-                user_acc = UserAcc2, limit = Limit - 1}}
-        end
+        ChangesRow = changes_row(Results, Change1, Acc),
+        {UserGo, UserAcc2} = Callback({change, ChangesRow}, UserAcc),
+        RealGo = case UserGo of
+            ok -> Go;
+            stop -> stop
+        end,
+        reset_heartbeat(),
+        couch_log:error("XKCD: CHANGE SEQ: ~p", [Seq]),
+        {RealGo, Acc#changes_acc{
+            seq = Seq,
+            user_acc = UserAcc2,
+            limit = Limit - 1
+        }}
     end.
 
 
@@ -823,14 +856,17 @@ view_changes_row(Results, KVs, Acc) ->
     ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
 
 
-changes_row(Results, #full_doc_info{} = FDI, Acc) ->
-    changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
-changes_row(Results, DocInfo, Acc) ->
-    #doc_info{
-        id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
-    } = DocInfo,
-    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
-        deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}.
+changes_row(Results, Change, Acc) ->
+    #{
+        id := Id,
+        sequence := Seq,
+        deleted := Del
+    } = Change,
+    {[
+        {<<"seq">>, Seq},
+        {<<"id">>, Id},
+        {<<"changes">>, Results}
+    ] ++ deleted_item(Del) ++ maybe_get_changes_doc(Change, Acc)}.
 
 maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
     #changes_acc{
@@ -840,9 +876,9 @@ maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
         filter = Filter
     } = Acc,
     Opts = case Conflicts of
-               true -> [deleted, conflicts];
-               false -> [deleted]
-           end,
+        true -> [deleted, conflicts];
+        false -> [deleted]
+    end,
     load_doc(Db, Value, Opts, DocOpts, Filter);
 
 maybe_get_changes_doc(_Value, _Acc) ->
@@ -850,7 +886,7 @@ maybe_get_changes_doc(_Value, _Acc) ->
 
 
 load_doc(Db, Value, Opts, DocOpts, Filter) ->
-    case couch_index_util:load_doc(Db, Value, Opts) of
+    case load_doc(Db, Value, Opts) of
         null ->
             [{doc, null}];
         Doc ->
@@ -858,6 +894,19 @@ load_doc(Db, Value, Opts, DocOpts, Filter) ->
     end.
 
 
+load_doc(Db, Change, Opts) ->
+    #{
+        id := Id,
+        rev_id := RevId
+    } = Change,
+    case fabric2_db:open_doc_revs(Db, Id, [RevId], Opts) of
+        {ok, [{ok, Doc}]} ->
+            Doc;
+        _ ->
+            null
+    end.
+
+
 doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}})
     when Fields =/= nil ->
     mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields);
@@ -870,17 +919,22 @@ deleted_item(_) -> [].
 
 % waits for a updated msg, if there are multiple msgs, collects them.
 wait_updated(Timeout, TimeoutFun, UserAcc) ->
+    couch_log:error("XKCD: WAITING FOR UPDATE", []),
     receive
     updated ->
+        couch_log:error("XKCD: GOT UPDATED", []),
         get_rest_updated(UserAcc);
     deleted ->
+        couch_log:error("XKCD: DB DELETED", []),
         {stop, UserAcc}
     after Timeout ->
         {Go, UserAcc2} = TimeoutFun(UserAcc),
         case Go of
         ok ->
+            couch_log:error("XKCD: WAIT UPDATED TIMEOUT, RETRY", []),
             ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
         stop ->
+            couch_log:error("XKCD: WAIT UPDATED TIMEOUT STOP", []),
             {stop, UserAcc2}
         end
     end.
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 27c1a8a..76b333e 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -93,9 +93,9 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
 handle_changes_req1(#httpd{}=Req, Db) ->
     #changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req),
     ChangesArgs = Args0#changes_args{
-        filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db),
         db_open_options = [{user_ctx, fabric2_db:get_user_ctx(Db)}]
     },
+    ChangesFun = chttpd_changes:handle_db_changes(ChangesArgs, Req, Db),
     Max = chttpd:chunked_response_buffer_size(),
     case ChangesArgs#changes_args.feed of
     "normal" ->
@@ -107,7 +107,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
                 mochi = Req,
                 threshold = Max
             },
-            fabric2_db:fold_changes(Db, <<>>, fun changes_callback/3, Acc0)
+            ChangesFun({fun changes_callback/2, Acc0})
         end);
     Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource"  ->
         couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
@@ -117,7 +117,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
             threshold = Max
         },
         try
-            fabric:changes(Db, fun changes_callback/3, Acc0, ChangesArgs)
+            ChangesFun({fun changes_callback/2, Acc0})
         after
             couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
         end;
@@ -127,15 +127,15 @@ handle_changes_req1(#httpd{}=Req, Db) ->
     end.
 
 % callbacks for continuous feed (newline-delimited JSON Objects)
-changes_callback(_TxDb, start, #cacc{feed = continuous} = Acc) ->
+changes_callback(start, #cacc{feed = continuous} = Acc) ->
     {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, {change, Change}, #cacc{feed = continuous} = Acc) ->
+changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
     chttpd_stats:incr_rows(),
     Data = [?JSON_ENCODE(Change) | "\n"],
     Len = iolist_size(Data),
     maybe_flush_changes_feed(Acc, Data, Len);
-changes_callback(_TxDb, {stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) ->
+changes_callback({stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) ->
     #cacc{mochi = Resp, buffer = Buf} = Acc,
     Row = {[
         {<<"last_seq">>, EndSeq},
@@ -146,7 +146,7 @@ changes_callback(_TxDb, {stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc)
     chttpd:end_delayed_json_response(Resp1);
 
 % callbacks for eventsource feed (newline-delimited eventsource Objects)
-changes_callback(_TxDb, start, #cacc{feed = eventsource} = Acc) ->
+changes_callback(start, #cacc{feed = eventsource} = Acc) ->
     #cacc{mochi = Req} = Acc,
     Headers = [
         {"Content-Type", "text/event-stream"},
@@ -154,7 +154,7 @@ changes_callback(_TxDb, start, #cacc{feed = eventsource} = Acc) ->
     ],
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, {change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
+changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
     chttpd_stats:incr_rows(),
     Seq = proplists:get_value(seq, ChangeProp),
     Chunk = [
@@ -164,34 +164,34 @@ changes_callback(_TxDb, {change, {ChangeProp}=Change}, #cacc{feed = eventsource}
     ],
     Len = iolist_size(Chunk),
     maybe_flush_changes_feed(Acc, Chunk, Len);
-changes_callback(_TxDb, timeout, #cacc{feed = eventsource} = Acc) ->
+changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
     #cacc{mochi = Resp} = Acc,
     Chunk = "event: heartbeat\ndata: \n\n",
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
     {ok, Acc#cacc{mochi = Resp1}};
-changes_callback(_TxDb, {stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
+changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
     #cacc{mochi = Resp, buffer = Buf} = Acc,
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
     chttpd:end_delayed_json_response(Resp1);
 
 % callbacks for longpoll and normal (single JSON Object)
-changes_callback(_TxDb, start, #cacc{feed = normal} = Acc) ->
+changes_callback(start, #cacc{feed = normal} = Acc) ->
     #cacc{etag = Etag, mochi = Req} = Acc,
     FirstChunk = "{\"results\":[\n",
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
         [{"ETag",Etag}], FirstChunk),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, start, Acc) ->
+changes_callback(start, Acc) ->
     #cacc{mochi = Req} = Acc,
     FirstChunk = "{\"results\":[\n",
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, {change, Change}, Acc) ->
+changes_callback({change, Change}, Acc) ->
     chttpd_stats:incr_rows(),
     Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)],
     Len = iolist_size(Data),
     maybe_flush_changes_feed(Acc, Data, Len);
-changes_callback(_TxDb, {stop, EndSeq, Pending}, Acc) ->
+changes_callback({stop, EndSeq, Pending}, Acc) ->
     #cacc{buffer = Buf, mochi = Resp, threshold = Max} = Acc,
     Terminator = [
         "\n],\n\"last_seq\":",
@@ -203,23 +203,26 @@ changes_callback(_TxDb, {stop, EndSeq, Pending}, Acc) ->
     {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, Terminator, Max),
     chttpd:end_delayed_json_response(Resp1);
 
-changes_callback(_TxDb, waiting_for_updates, #cacc{buffer = []} = Acc) ->
+changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
     {ok, Acc};
-changes_callback(_TxDb, waiting_for_updates, Acc) ->
+changes_callback(waiting_for_updates, Acc) ->
     #cacc{buffer = Buf, mochi = Resp} = Acc,
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
     {ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}};
-changes_callback(_TxDb, timeout, Acc) ->
+changes_callback(timeout, Acc) ->
     {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
     {ok, Acc#cacc{mochi = Resp1}};
-changes_callback(_TxDb, {error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
+changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
     #cacc{mochi = Req} = Acc,
     chttpd:send_error(Req, Reason);
-changes_callback(_TxDb, {error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
+changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
     #cacc{mochi = Req} = Acc,
     chttpd:send_error(Req, Reason);
-changes_callback(_TxDb, {error, Reason}, Acc) ->
-    chttpd:send_delayed_error(Acc#cacc.mochi, Reason).
+changes_callback({error, Reason}, Acc) ->
+    chttpd:send_delayed_error(Acc#cacc.mochi, Reason);
+
+changes_callback(A, B) ->
+    erlang:error({changes_error, A, B}).
 
 maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
          when Size > 0 andalso (Size + Len) > Max ->
@@ -432,13 +435,10 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
 db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
     send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
 
-db_req(#httpd{method='POST', path_parts=[DbName, <<"_ensure_full_commit">>],
-        user_ctx=Ctx}=Req, _Db) ->
+db_req(#httpd{method='POST', path_parts=[_DbName, <<"_ensure_full_commit">>],
+        user_ctx=Ctx}=Req, Db) ->
     chttpd:validate_ctype(Req, "application/json"),
-    %% use fabric call to trigger a database_does_not_exist exception
-    %% for missing databases that'd return error 404 from chttpd
-    %% get_security used to prefer shards on the same node over other nodes
-    fabric:get_security(DbName, [{user_ctx, Ctx}]),
+    #{db_prefix := <<_/binary>>} = Db,
     send_json(Req, 201, {[
         {ok, true},
         {instance_start_time, <<"0">>}
@@ -807,8 +807,8 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
         200, [], FirstChunk),
     VAcc1 = VAcc0#vacc{resp=Resp0},
     VAcc2 = lists:foldl(fun(Args, Acc0) ->
-        {ok, Acc1} = fabric:all_docs(Db, Options,
-            fun view_cb/2, Acc0, Args),
+        {ok, Acc1} = fabric2_db:fold_docs(Db, Options,
+            fun view_cb/3, Acc0, Args),
         Acc1
     end, VAcc1, ArgQueries),
     {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
@@ -822,10 +822,10 @@ all_docs_view(Req, Db, _Keys, _OP) ->
     Options = [{user_ctx, Req#httpd.user_ctx}],
     Max = chttpd:chunked_response_buffer_size(),
     VAcc = #vacc{db=Db, req=Req, threshold=Max},
-    {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options),
+    {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/3, VAcc, Options),
     {ok, Resp#vacc.resp}.
 
-view_cb({row, Row} = Msg, Acc) ->
+view_cb(_TxDb, {row, Row} = Msg, Acc) ->
     case lists:keymember(doc, 1, Row) of
         true -> chttpd_stats:incr_reads();
         false -> ok
@@ -833,7 +833,7 @@ view_cb({row, Row} = Msg, Acc) ->
     chttpd_stats:incr_rows(),
     couch_mrview_http:view_cb(Msg, Acc);
 
-view_cb(Msg, Acc) ->
+view_cb(_TxDb, Msg, Acc) ->
     couch_mrview_http:view_cb(Msg, Acc).
 
 db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
@@ -974,7 +974,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
     case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
     ("multipart/related;" ++ _) = ContentType ->
         couch_httpd:check_max_request_length(Req),
-        couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
+        couch_httpd_multipart:num_mp_writers(1),
         {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType,
                 fun() -> receive_request_data(Req) end),
         Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
@@ -1129,7 +1129,7 @@ send_docs_multipart(Req, Results, Options1) ->
     CType = {"Content-Type",
         "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
     {ok, Resp} = start_chunked_response(Req, 200, [CType]),
-    couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
+    chttpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
     lists:foreach(
         fun({ok, #doc{atts=Atts}=Doc}) ->
             Refs = monitor_attachments(Doc#doc.atts),
@@ -1137,25 +1137,25 @@ send_docs_multipart(Req, Results, Options1) ->
             JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
             {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
                     InnerBoundary, JsonBytes, Atts, true),
-            couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ",
+            chttpd:send_chunk(Resp, <<"\r\nContent-Type: ",
                     ContentType/binary, "\r\n\r\n">>),
             couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
-                    fun(Data) -> couch_httpd:send_chunk(Resp, Data)
+                    fun(Data) -> chttpd:send_chunk(Resp, Data)
                     end, true),
-             couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
+             chttpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
             after
                 demonitor_refs(Refs)
             end;
         ({{not_found, missing}, RevId}) ->
              RevStr = couch_doc:rev_to_str(RevId),
              Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
-             couch_httpd:send_chunk(Resp,
+             chttpd:send_chunk(Resp,
                 [<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
                 Json,
                 <<"\r\n--", OuterBoundary/binary>>])
          end, Results),
-    couch_httpd:send_chunk(Resp, <<"--">>),
-    couch_httpd:last_chunk(Resp).
+    chttpd:send_chunk(Resp, <<"--">>),
+    chttpd:last_chunk(Resp).
 
 bulk_get_multipart_headers({0, []}, Id, Boundary) ->
     [
@@ -1438,8 +1438,12 @@ db_attachment_req(#httpd{method='GET',mochi_req=MochiReq}=Req, Db, DocId, FileNa
     end;
 
 
-db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNameParts)
+db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
         when (Method == 'PUT') or (Method == 'DELETE') ->
+    #httpd{
+        user_ctx = Ctx,
+        mochi_req = MochiReq
+    } = Req,
     FileName = validate_attachment_name(
                     mochiweb_util:join(
                         lists:map(fun binary_to_list/1,
@@ -1449,16 +1453,45 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
         'DELETE' ->
             [];
         _ ->
-            MimeType = case couch_httpd:header_value(Req,"Content-Type") of
+            MimeType = case chttpd:header_value(Req,"Content-Type") of
                 % We could throw an error here or guess by the FileName.
                 % Currently, just giving it a default.
                 undefined -> <<"application/octet-stream">>;
                 CType -> list_to_binary(CType)
             end,
-            Data = fabric:att_receiver(Req, chttpd:body_length(Req)),
+            Data = case chttpd:body_length(Req) of
+                undefined ->
+                    <<"">>;
+                {unknown_transfer_encoding, Unknown} ->
+                    exit({unknown_transfer_encoding, Unknown});
+                chunked ->
+                    fun(MaxChunkSize, ChunkFun, InitState) ->
+                        chttpd:recv_chunked(
+                            Req, MaxChunkSize, ChunkFun, InitState
+                        )
+                    end;
+                0 ->
+                    <<"">>;
+                Length when is_integer(Length) ->
+                    Expect = case chttpd:header_value(Req, "expect") of
+                        undefined ->
+                            undefined;
+                        Value when is_list(Value) ->
+                            string:to_lower(Value)
+                    end,
+                    case Expect of
+                        "100-continue" ->
+                            MochiReq:start_raw_response({100, gb_trees:empty()});
+                        _Else ->
+                            ok
+                    end,
+                    fun() -> chttpd:recv(Req, 0) end;
+                Length ->
+                    exit({length_not_integer, Length})
+            end,
             ContentLen = case couch_httpd:header_value(Req,"Content-Length") of
                 undefined -> undefined;
-                Length -> list_to_integer(Length)
+                CL -> list_to_integer(CL)
             end,
             ContentEnc = string:to_lower(string:strip(
                 couch_httpd:header_value(Req, "Content-Encoding", "identity")
@@ -1517,7 +1550,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
         HttpCode = 202
     end,
     erlang:put(mochiweb_request_recv, true),
-    DbName = couch_db:name(Db),
+    DbName = fabric2_db:name(Db),
 
     {Status, Headers} = case Method of
         'DELETE' ->
@@ -1673,7 +1706,7 @@ parse_changes_query(Req) ->
         {"descending", "true"} ->
             Args#changes_args{dir=rev};
         {"since", _} ->
-            Args#changes_args{since=Value};
+            Args#changes_args{since=parse_since_seq(Value)};
         {"last-event-id", _} ->
             Args#changes_args{since=Value};
         {"limit", _} ->
@@ -1727,6 +1760,27 @@ parse_changes_query(Req) ->
             ChangesArgs
     end.
 
+
+parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 30 ->
+    throw({bad_request, url_encoded_since_seq});
+
+parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 2 ->
+    % We have implicitly allowed the since seq to either be
+    % JSON encoded or a "raw" string. Here we just remove the
+    % surrounding quotes if they exist and are paired.
+    SeqSize = size(Seq) - 2,
+    case Seq of
+        <<"\"", S:SeqSize/binary, "\"">> -> S;
+        S -> S
+    end;
+
+parse_since_seq(Seq) when is_binary(Seq) ->
+    Seq;
+
+parse_since_seq(Seq) when is_list(Seq) ->
+    parse_since_seq(iolist_to_binary(Seq)).
+
+
 extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
     extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
 extract_header_rev(Req, ExplicitRev) ->
@@ -1767,6 +1821,8 @@ monitor_attachments(Atts) when is_list(Atts) ->
         case couch_att:fetch(data, Att) of
             {Fd, _} ->
                 [monitor(process, Fd) | Monitors];
+            {loc, _, _, _} ->
+                Monitors;
             stub ->
                 Monitors;
             Else ->
diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl
index fa35c6b..3e59ffe 100644
--- a/src/chttpd/src/chttpd_external.erl
+++ b/src/chttpd/src/chttpd_external.erl
@@ -74,7 +74,7 @@ json_req_obj_fields() ->
      <<"peer">>, <<"form">>, <<"cookie">>, <<"userCtx">>, <<"secObj">>].
 
 json_req_obj_field(<<"info">>, #httpd{}, Db, _DocId) ->
-    {ok, Info} = get_db_info(Db),
+    {ok, Info} = fabric2_db:get_db_info(Db),
     {Info};
 json_req_obj_field(<<"uuid">>, #httpd{}, _Db, _DocId) ->
     couch_uuids:new();
@@ -117,27 +117,18 @@ json_req_obj_field(<<"form">>, #httpd{mochi_req=Req, method=Method}=HttpReq, Db,
 json_req_obj_field(<<"cookie">>, #httpd{mochi_req=Req}, _Db, _DocId) ->
     to_json_terms(Req:parse_cookie());
 json_req_obj_field(<<"userCtx">>, #httpd{}, Db, _DocId) ->
-    couch_util:json_user_ctx(Db);
-json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) ->
-    get_db_security(Db, UserCtx).
-
-
-get_db_info(Db) ->
-    case couch_db:is_clustered(Db) of
-        true ->
-            fabric:get_db_info(Db);
-        false ->
-            couch_db:get_db_info(Db)
-    end.
-
-
-get_db_security(Db, #user_ctx{}) ->
-    case couch_db:is_clustered(Db) of
-        true ->
-            fabric:get_security(Db);
-        false ->
-            couch_db:get_security(Db)
-    end.
+    json_user_ctx(Db);
+json_req_obj_field(<<"secObj">>, #httpd{user_ctx = #user_ctx{}}, Db, _DocId) ->
+    fabric2_db:get_security(Db).
+
+
+json_user_ctx(Db) ->
+    Ctx = fabric2_db:get_user_ctx(Db),
+    {[
+        {<<"db">>, fabric2_db:name(Db)},
+        {<<"name">>, Ctx#user_ctx.name},
+        {<<"roles">>, Ctx#user_ctx.roles}
+    ]}.
 
 
 to_json_terms(Data) ->
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index a24de21..924b58d 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -29,7 +29,7 @@
 -export([
     size_info/1,
     to_disk_term/1,
-    from_disk_term/2
+    from_disk_term/3
 ]).
 
 -export([
@@ -38,7 +38,7 @@
 ]).
 
 -export([
-    flush/2,
+    flush/3,
     foldl/3,
     range_foldl/5,
     foldl_decode/3,
@@ -46,11 +46,6 @@
 ]).
 
 -export([
-    upgrade/1,
-    downgrade/1
-]).
-
--export([
     max_attachment_size/0,
     validate_attachment_size/3
 ]).
@@ -58,137 +53,61 @@
 -compile(nowarn_deprecated_type).
 -export_type([att/0]).
 
--include_lib("couch/include/couch_db.hrl").
-
-
-%% Legacy attachment record. This is going to be phased out by the new proplist
-%% based structure. It's needed for now to allow code to perform lazy upgrades
-%% while the patch is rolled out to the cluster. Attachments passed as records
-%% will remain so until they are required to be represented as property lists.
-%% Once this has been widely deployed, this record will be removed entirely and
-%% property lists will be the main format.
--record(att, {
-    name :: binary(),
-    type :: binary(),
-    att_len :: non_neg_integer(),
-
-    %% length of the attachment in its identity form
-    %% (that is, without a content encoding applied to it)
-    %% differs from att_len when encoding /= identity
-    disk_len :: non_neg_integer(),
-
-    md5 = <<>> :: binary(),
-    revpos = 0 :: non_neg_integer(),
-    data :: stub | follows | binary() | {any(), any()} |
-            {follows, pid(), reference()} | fun(() -> binary()),
-
-    %% Encoding of the attachment
-    %% currently supported values are:
-    %%     identity, gzip
-    %% additional values to support in the future:
-    %%     deflate, compress
-    encoding = identity :: identity | gzip
-}).
-
-
-%% Extensible Attachment Type
-%%
-%% The following types describe the known properties for attachment fields
-%% encoded as property lists to allow easier upgrades. Values not in this list
-%% should be accepted at runtime but should be treated as opaque data as might
-%% be used by upgraded code. If you plan on operating on new data, please add
-%% an entry here as documentation.
-
-
-%% The name of the attachment is also used as the mime-part name for file
-%% downloads. These must be unique per document.
--type name_prop() :: {name, binary()}.
-
-
-%% The mime type of the attachment. This does affect compression of certain
-%% attachments if the type is found to be configured as a compressable type.
-%% This is commonly reserved for text/* types but could include other custom
-%% cases as well. See definition and use of couch_util:compressable_att_type/1.
--type type_prop() :: {type, binary()}.
-
-
-%% The attachment length is similar to disk-length but ignores additional
-%% encoding that may have occurred.
--type att_len_prop() :: {att_len, non_neg_integer()}.
-
-
-%% The size of the attachment as stored in a disk stream.
--type disk_len_prop() :: {disk_len, non_neg_integer()}.
-
-
-%% This is a digest of the original attachment data as uploaded by the client.
-%% it's useful for checking validity of contents against other attachment data
-%% as well as quick digest computation of the enclosing document.
--type md5_prop() :: {md5, binary()}.
-
 
--type revpos_prop() :: {revpos, 0}.
+-include_lib("couch/include/couch_db.hrl").
 
 
-%% This field is currently overloaded with just about everything. The
-%% {any(), any()} type is just there until I have time to check the actual
-%% values expected. Over time this should be split into more than one property
-%% to allow simpler handling.
--type data_prop() :: {
-    data, stub | follows | binary() | {any(), any()} |
-    {follows, pid(), reference()} | fun(() -> binary())
-}.
+-define(CURRENT_ATT_FORMAT, 0).
 
 
-%% We will occasionally compress our data. See type_prop() for more information
-%% on when this happens.
--type encoding_prop() :: {encoding, identity | gzip}.
+-type prop_name() ::
+    name |
+    type |
+    att_len |
+    disk_len |
+    md5 |
+    revpos |
+    data |
+    encoding.
 
 
--type attachment() :: [
-    name_prop() | type_prop() |
-    att_len_prop() | disk_len_prop() |
-    md5_prop() | revpos_prop() |
-    data_prop() | encoding_prop()
-].
+-type data_prop_type() ::
+    {loc, #{}, binary(), binary()} |
+    stub |
+    follows |
+    binary() |
+    {follows, pid(), reference()} |
+    fun(() -> binary()).
 
--type disk_att_v1() :: {
-    Name :: binary(),
-    Type :: binary(),
-    Sp :: any(),
-    AttLen :: non_neg_integer(),
-    RevPos :: non_neg_integer(),
-    Md5 :: binary()
-}.
 
--type disk_att_v2() :: {
-    Name :: binary(),
-    Type :: binary(),
-    Sp :: any(),
-    AttLen :: non_neg_integer(),
-    DiskLen :: non_neg_integer(),
-    RevPos :: non_neg_integer(),
-    Md5 :: binary(),
-    Enc :: identity | gzip
+-type att() :: #{
+    name := binary(),
+    type := binary(),
+    att_len := non_neg_integer() | undefined,
+    disk_len := non_neg_integer() | undefined,
+    md5 := binary() | undefined,
+    revpos := non_neg_integer(),
+    data := data_prop_type(),
+    encoding := identity | gzip | undefined,
+    headers := [{binary(), binary()}] | undefined
 }.
 
--type disk_att_v3() :: {Base :: tuple(), Extended :: list()}.
-
--type disk_att() :: disk_att_v1() | disk_att_v2() | disk_att_v3().
-
--type att() :: #att{} | attachment() | disk_att().
 
 new() ->
-    %% We construct a record by default for compatability. This will be
-    %% upgraded on demand. A subtle effect this has on all attachments
-    %% constructed via new is that it will pick up the proper defaults
-    %% from the #att record definition given above. Newer properties do
-    %% not support special default values and will all be treated as
-    %% undefined.
-    #att{}.
+    #{
+        name => <<>>,
+        type => <<>>,
+        att_len => undefined,
+        disk_len => undefined,
+        md5 => undefined,
+        revpos => 0,
+        data => undefined,
+        encoding => undefined,
+        headers => undefined
+    }.
 
 
--spec new([{atom(), any()}]) -> att().
+-spec new([{prop_name(), any()}]) -> att().
 new(Props) ->
     store(Props, new()).
 
@@ -197,71 +116,28 @@ new(Props) ->
            (atom(), att()) -> any().
 fetch(Fields, Att) when is_list(Fields) ->
     [fetch(Field, Att) || Field <- Fields];
-fetch(Field, Att) when is_list(Att) ->
-    case lists:keyfind(Field, 1, Att) of
-        {Field, Value} -> Value;
-        false -> undefined
-    end;
-fetch(name, #att{name = Name}) ->
-    Name;
-fetch(type, #att{type = Type}) ->
-    Type;
-fetch(att_len, #att{att_len = AttLen}) ->
-    AttLen;
-fetch(disk_len, #att{disk_len = DiskLen}) ->
-    DiskLen;
-fetch(md5, #att{md5 = Digest}) ->
-    Digest;
-fetch(revpos, #att{revpos = RevPos}) ->
-    RevPos;
-fetch(data, #att{data = Data}) ->
-    Data;
-fetch(encoding, #att{encoding = Encoding}) ->
-    Encoding;
-fetch(_, _) ->
-    undefined.
+fetch(Field, Att) ->
+    maps:get(Field, Att).
 
 
 -spec store([{atom(), any()}], att()) -> att().
 store(Props, Att0) ->
     lists:foldl(fun({Field, Value}, Att) ->
-        store(Field, Value, Att)
+        maps:update(Field, Value, Att)
     end, Att0, Props).
 
 
--spec store(atom(), any(), att()) -> att().
-store(Field, undefined, Att) when is_list(Att) ->
-    lists:keydelete(Field, 1, Att);
-store(Field, Value, Att) when is_list(Att) ->
-    lists:keystore(Field, 1, Att, {Field, Value});
-store(name, Name, Att) ->
-    Att#att{name = Name};
-store(type, Type, Att) ->
-    Att#att{type = Type};
-store(att_len, AttLen, Att) ->
-    Att#att{att_len = AttLen};
-store(disk_len, DiskLen, Att) ->
-    Att#att{disk_len = DiskLen};
-store(md5, Digest, Att) ->
-    Att#att{md5 = Digest};
-store(revpos, RevPos, Att) ->
-    Att#att{revpos = RevPos};
-store(data, Data, Att) ->
-    Att#att{data = Data};
-store(encoding, Encoding, Att) ->
-    Att#att{encoding = Encoding};
 store(Field, Value, Att) ->
-    store(Field, Value, upgrade(Att)).
+    maps:update(Field, Value, Att).
 
 
 -spec transform(atom(), fun(), att()) -> att().
 transform(Field, Fun, Att) ->
-    NewValue = Fun(fetch(Field, Att)),
-    store(Field, NewValue, Att).
+    maps:update_with(Field, Fun, Att).
 
 
-is_stub(Att) ->
-    stub == fetch(data, Att).
+is_stub(#{data := stub}) -> true;
+is_stub(#{}) -> false.
 
 
 %% merge_stubs takes all stub attachments and replaces them with on disk
@@ -275,8 +151,7 @@ merge_stubs(MemAtts, DiskAtts) ->
     merge_stubs(MemAtts, OnDisk, []).
 
 
-%% restore spec when R14 support is dropped
-%% -spec merge_stubs([att()], dict:dict(), [att()]) -> [att()].
+-spec merge_stubs([att()], dict:dict(), [att()]) -> [att()].
 merge_stubs([Att | Rest], OnDisk, Merged) ->
     case fetch(data, Att) of
         stub ->
@@ -308,14 +183,8 @@ size_info([]) ->
     {ok, []};
 size_info(Atts) ->
     Info = lists:map(fun(Att) ->
-        AttLen = fetch(att_len, Att),
-        case fetch(data, Att) of
-             {stream, StreamEngine} ->
-                 {ok, SPos} = couch_stream:to_disk_term(StreamEngine),
-                 {SPos, AttLen};
-             {_, SPos} ->
-                 {SPos, AttLen}
-        end
+        [{loc, _Db, _DocId, AttId}, AttLen] = fetch([data, att_len], Att),
+        {AttId, AttLen}
     end, Atts),
     {ok, lists:usort(Info)}.
 
@@ -324,89 +193,44 @@ size_info(Atts) ->
 %% old format when possible. This should help make the attachment lazy upgrade
 %% as safe as possible, avoiding the need for complicated disk versioning
 %% schemes.
-to_disk_term(#att{} = Att) ->
-    {stream, StreamEngine} = fetch(data, Att),
-    {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
-    {
+to_disk_term(Att) ->
+    {loc, #{}, _DocId, AttId} = fetch(data, Att),
+    {?CURRENT_ATT_FORMAT, {
         fetch(name, Att),
         fetch(type, Att),
-        Sp,
+        AttId,
         fetch(att_len, Att),
         fetch(disk_len, Att),
         fetch(revpos, Att),
         fetch(md5, Att),
-        fetch(encoding, Att)
-    };
-to_disk_term(Att) ->
-    BaseProps = [name, type, data, att_len, disk_len, revpos, md5, encoding],
-    {Extended, Base} = lists:foldl(
-        fun
-            (data, {Props, Values}) ->
-                case lists:keytake(data, 1, Props) of
-                    {value, {_, {stream, StreamEngine}}, Other} ->
-                        {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
-                        {Other, [Sp | Values]};
-                    {value, {_, Value}, Other} ->
-                        {Other, [Value | Values]};
-                    false ->
-                        {Props, [undefined | Values]}
-                end;
-            (Key, {Props, Values}) ->
-                case lists:keytake(Key, 1, Props) of
-                    {value, {_, Value}, Other} -> {Other, [Value | Values]};
-                    false -> {Props, [undefined | Values]}
-                end
-        end,
-        {Att, []},
-        BaseProps
-    ),
-    {list_to_tuple(lists:reverse(Base)), Extended}.
-
-
-%% The new disk term format is a simple wrapper around the legacy format. Base
-%% properties will remain in a tuple while the new fields and possibly data from
-%% future extensions will be stored in a list of atom/value pairs. While this is
-%% slightly less efficient, future work should be able to make use of
-%% compression to remove these sorts of common bits (block level compression
-%% with something like a shared dictionary that is checkpointed every now and
-%% then).
-from_disk_term(StreamSrc, {Base, Extended})
-        when is_tuple(Base), is_list(Extended) ->
-    store(Extended, from_disk_term(StreamSrc, Base));
-from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
-    {ok, Stream} = open_stream(StreamSrc, Sp),
-    #att{
-        name=Name,
-        type=Type,
-        att_len=AttLen,
-        disk_len=DiskLen,
-        md5=Md5,
-        revpos=RevPos,
-        data={stream, Stream},
-        encoding=upgrade_encoding(Enc)
-    };
-from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
-    {ok, Stream} = open_stream(StreamSrc, Sp),
-    #att{
-        name=Name,
-        type=Type,
-        att_len=AttLen,
-        disk_len=AttLen,
-        md5=Md5,
-        revpos=RevPos,
-        data={stream, Stream}
-    };
-from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) ->
-    {ok, Stream} = open_stream(StreamSrc, Sp),
-    #att{
-        name=Name,
-        type=Type,
-        att_len=AttLen,
-        disk_len=AttLen,
-        md5= <<>>,
-        revpos=0,
-        data={stream, Stream}
-    }.
+        fetch(encoding, Att),
+        fetch(headers, Att)
+    }}.
+
+
+from_disk_term(#{} = Db, DocId, {?CURRENT_ATT_FORMAT, Props}) ->
+    {
+        Name,
+        Type,
+        AttId,
+        AttLen,
+        DiskLen,
+        RevPos,
+        Md5,
+        Encoding,
+        Headers
+    } = Props,
+    new([
+        {name, Name},
+        {type, Type},
+        {data, {loc, Db#{tx := undefined}, DocId, AttId}},
+        {att_len, AttLen},
+        {disk_len, DiskLen},
+        {revpos, RevPos},
+        {md5, Md5},
+        {encoding, Encoding},
+        {headers, Headers}
+    ]).
 
 
 %% from_json reads in embedded JSON attachments and creates usable attachment
@@ -433,8 +257,12 @@ stub_from_json(Att, Props) ->
     %% json object. See merge_stubs/3 for the stub check.
     RevPos = couch_util:get_value(<<"revpos">>, Props),
     store([
-        {md5, Digest}, {revpos, RevPos}, {data, stub}, {disk_len, DiskLen},
-        {att_len, EncodedLen}, {encoding, Encoding}
+        {data, stub},
+        {disk_len, DiskLen},
+        {att_len, EncodedLen},
+        {revpos, RevPos},
+        {md5, Digest},
+        {encoding, Encoding}
     ], Att).
 
 
@@ -443,8 +271,12 @@ follow_from_json(Att, Props) ->
     Digest = digest_from_json(Props),
     RevPos = couch_util:get_value(<<"revpos">>, Props, 0),
     store([
-        {md5, Digest}, {revpos, RevPos}, {data, follows}, {disk_len, DiskLen},
-        {att_len, EncodedLen}, {encoding, Encoding}
+        {data, follows},
+        {disk_len, DiskLen},
+        {att_len, EncodedLen},
+        {revpos, RevPos},
+        {md5, Digest},
+        {encoding, Encoding}
     ], Att).
 
 
@@ -455,8 +287,10 @@ inline_from_json(Att, Props) ->
             Length = size(Data),
             RevPos = couch_util:get_value(<<"revpos">>, Props, 0),
             store([
-                {data, Data}, {revpos, RevPos}, {disk_len, Length},
-                {att_len, Length}
+                {data, Data},
+                {disk_len, Length},
+                {att_len, Length},
+                {revpos, RevPos}
             ], Att)
     catch
         _:_ ->
@@ -466,7 +300,6 @@ inline_from_json(Att, Props) ->
     end.
 
 
-
 encoded_lengths_from_json(Props) ->
     Len = couch_util:get_value(<<"length">>, Props),
     case couch_util:get_value(<<"encoding">>, Props) of
@@ -488,9 +321,17 @@ digest_from_json(Props) ->
 
 
 to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
-    [Name, Data, DiskLen, AttLen, Enc, Type, RevPos, Md5] = fetch(
-        [name, data, disk_len, att_len, encoding, type, revpos, md5], Att
-    ),
+    #{
+        name := Name,
+        type := Type,
+        data := Data,
+        disk_len := DiskLen,
+        att_len := AttLen,
+        revpos := RevPos,
+        md5 := Md5,
+        encoding := Encoding,
+        headers := Headers
+    } = Att,
     Props = [
         {<<"content_type">>, Type},
         {<<"revpos">>, RevPos}
@@ -505,71 +346,71 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
         DataToFollow ->
             [{<<"length">>, DiskLen}, {<<"follows">>, true}];
         true ->
-            AttData = case Enc of
+            AttData = case Encoding of
                 gzip -> zlib:gunzip(to_binary(Att));
                 identity -> to_binary(Att)
             end,
             [{<<"data">>, base64:encode(AttData)}]
     end,
     EncodingProps = if
-        ShowEncoding andalso Enc /= identity ->
+        ShowEncoding andalso Encoding /= identity ->
             [
-                {<<"encoding">>, couch_util:to_binary(Enc)},
+                {<<"encoding">>, couch_util:to_binary(Encoding)},
                 {<<"encoded_length">>, AttLen}
             ];
         true ->
             []
     end,
-    HeadersProp = case fetch(headers, Att) of
+    HeadersProp = case Headers of
         undefined -> [];
         Headers -> [{<<"headers">>, Headers}]
     end,
     {Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}.
 
 
-flush(Db, Att) ->
-    flush_data(Db, fetch(data, Att), Att).
+flush(Db, DocId, Att1) ->
+    Att2 = read_data(fetch(data, Att1), Att1),
+    [
+        Data,
+        AttLen,
+        DiskLen,
+        ReqMd5,
+        Encoding
+    ] = fetch([data, att_len, disk_len, md5, encoding], Att2),
+
+    % Eventually, we'll check if we can compress this
+    % attachment here and do so if possible.
+
+    % If we were sent a gzip'ed attachment with no
+    % length data, we have to set it here.
+    Att3 = case AttLen of
+        undefined -> store(att_len, DiskLen, Att2);
+        _ -> Att2
+    end,
 
+    % If no encoding has been set, default to
+    % identity
+    Att4 = case Encoding of
+        undefined -> store(encoding, identity, Att3);
+        _ -> Att3
+    end,
 
-flush_data(Db, Data, Att) when is_binary(Data) ->
-    couch_db:with_stream(Db, Att, fun(OutputStream) ->
-        couch_stream:write(OutputStream, Data)
-    end);
-flush_data(Db, Fun, Att) when is_function(Fun) ->
-    AttName = fetch(name, Att),
-    MaxAttSize = max_attachment_size(),
-    case fetch(att_len, Att) of
-        undefined ->
-            couch_db:with_stream(Db, Att, fun(OutputStream) ->
-                % Fun(MaxChunkSize, WriterFun) must call WriterFun
-                % once for each chunk of the attachment,
-                Fun(4096,
-                    % WriterFun({Length, Binary}, State)
-                    % WriterFun({0, _Footers}, State)
-                    % Called with Length == 0 on the last time.
-                    % WriterFun returns NewState.
-                    fun({0, Footers}, _Total) ->
-                        F = mochiweb_headers:from_binary(Footers),
-                        case mochiweb_headers:get_value("Content-MD5", F) of
-                        undefined ->
-                            ok;
-                        Md5 ->
-                            {md5, base64:decode(Md5)}
-                        end;
-                    ({Length, Chunk}, Total0) ->
-                        Total = Total0 + Length,
-                        validate_attachment_size(AttName, Total, MaxAttSize),
-                        couch_stream:write(OutputStream, Chunk),
-                        Total
-                    end, 0)
-            end);
-        AttLen ->
-            validate_attachment_size(AttName, AttLen, MaxAttSize),
-            couch_db:with_stream(Db, Att, fun(OutputStream) ->
-                write_streamed_attachment(OutputStream, Fun, AttLen)
-            end)
-    end;
-flush_data(Db, {follows, Parser, Ref}, Att) ->
+    case Data of
+        {loc, _, _, _} ->
+            % Already flushed
+            Att1;
+        _ when is_binary(Data) ->
+            IdentityMd5 = get_identity_md5(Data, fetch(encoding, Att4)),
+            couch_util:check_md5(IdentityMd5, ReqMd5),
+            fabric2_db:write_attachment(Db, DocId, Att4)
+    end.
+
+
+read_data({loc, #{}, _DocId, _AttId}, Att) ->
+    % Attachment already written to fdb
+    Att;
+
+read_data({follows, Parser, Ref}, Att) ->
     ParserRef = erlang:monitor(process, Parser),
     Fun = fun() ->
         Parser ! {get_bytes, Ref, self()},
@@ -583,41 +424,72 @@ flush_data(Db, {follows, Parser, Ref}, Att) ->
         end
     end,
     try
-        flush_data(Db, Fun, store(data, Fun, Att))
+        read_data(Fun, store(data, Fun, Att))
     after
         erlang:demonitor(ParserRef, [flush])
     end;
-flush_data(Db, {stream, StreamEngine}, Att) ->
-    case couch_db:is_active_stream(Db, StreamEngine) of
-        true ->
-            % Already written
-            Att;
-        false ->
-            NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) ->
-                couch_stream:copy(StreamEngine, OutputStream)
-            end),
-            InMd5 = fetch(md5, Att),
-            OutMd5 = fetch(md5, NewAtt),
-            couch_util:check_md5(OutMd5, InMd5),
-            NewAtt
+
+read_data(Data, Att) when is_binary(Data) ->
+    Att;
+
+read_data(Fun, Att) when is_function(Fun) ->
+    [AttName, AttLen, InMd5] = fetch([name, att_len, md5], Att),
+    MaxAttSize = max_attachment_size(),
+    case AttLen of
+        undefined ->
+            % Fun(MaxChunkSize, WriterFun) must call WriterFun
+            % once for each chunk of the attachment,
+            WriterFun = fun
+                ({0, Footers}, {Len, Acc}) ->
+                    F = mochiweb_headers:from_binary(Footers),
+                    Md5 = case mochiweb_headers:get_value("Content-MD5", F) of
+                        undefined -> undefined;
+                        Value -> base64:decode(Value)
+                    end,
+                    Props0 = [
+                        {data, iolist_to_binary(lists:reverse(Acc))},
+                        {disk_len, Len}
+                    ],
+                    Props1 = if InMd5 /= md5_in_footer -> Props0; true ->
+                        [{md5, Md5} | Props0]
+                    end,
+                    store(Props1, Att);
+                ({ChunkLen, Chunk}, {Len, Acc}) ->
+                    NewLen = Len + ChunkLen,
+                    validate_attachment_size(AttName, NewLen, MaxAttSize),
+                    {NewLen, [Chunk | Acc]}
+            end,
+            Fun(8192, WriterFun, {0, []});
+        AttLen ->
+            validate_attachment_size(AttName, AttLen, MaxAttSize),
+            read_streamed_attachment(Att, Fun, AttLen, [])
     end.
 
 
-write_streamed_attachment(_Stream, _F, 0) ->
-    ok;
-write_streamed_attachment(_Stream, _F, LenLeft) when LenLeft < 0 ->
+read_streamed_attachment(Att, _F, 0, Acc) ->
+    Bin = iolist_to_binary(lists:reverse(Acc)),
+    store([
+        {data, Bin},
+        {disk_len, size(Bin)}
+    ], Att);
+
+read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 ->
     throw({bad_request, <<"attachment longer than expected">>});
-write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
-    Bin = try read_next_chunk(F, LenLeft)
+
+read_streamed_attachment(Att, F, LenLeft, Acc) when LenLeft > 0 ->
+    Bin = try
+        read_next_chunk(F, LenLeft)
     catch
         {mp_parser_died, normal} ->
             throw({bad_request, <<"attachment shorter than expected">>})
     end,
-    ok = couch_stream:write(Stream, Bin),
-    write_streamed_attachment(Stream, F, LenLeft - iolist_size(Bin)).
+    Size = iolist_size(Bin),
+    read_streamed_attachment(Att, F, LenLeft - Size, [Bin | Acc]).
+
 
 read_next_chunk(F, _) when is_function(F, 0) ->
     F();
+
 read_next_chunk(F, LenLeft) when is_function(F, 1) ->
     F(lists:min([LenLeft, 16#2000])).
 
@@ -626,14 +498,17 @@ foldl(Att, Fun, Acc) ->
     foldl(fetch(data, Att), Att, Fun, Acc).
 
 
+foldl({loc, Db, DocId, AttId}, _Att, Fun, Acc) ->
+    Bin = fabric2_db:read_attachment(Db#{tx := undefined}, DocId, AttId),
+    Fun(Bin, Acc);
+
 foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) ->
     Fun(Bin, Acc);
-foldl({stream, StreamEngine}, Att, Fun, Acc) ->
-    Md5 = fetch(md5, Att),
-    couch_stream:foldl(StreamEngine, Md5, Fun, Acc);
+
 foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) ->
     Len = fetch(att_len, Att),
     fold_streamed_data(DataFun, Len, Fun, Acc);
+
 foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
     ParserRef = erlang:monitor(process, Parser),
     DataFun = fun() ->
@@ -654,19 +529,26 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
     end.
 
 
+range_foldl(Bin1, From, To, Fun, Acc) when is_binary(Bin1) ->
+    ReadLen = To - From,
+    Bin2 = case Bin1 of
+        _ when size(Bin1) < From -> <<>>;
+        <<_:From/binary, B2>> -> B2
+    end,
+    Bin3 = case Bin2 of
+        _ when size(Bin2) < ReadLen -> Bin2;
+        <<B3:ReadLen/binary, _/binary>> -> B3
+    end,
+    Fun(Bin3, Acc);
+
 range_foldl(Att, From, To, Fun, Acc) ->
-    {stream, StreamEngine} = fetch(data, Att),
-    couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc).
+    {loc, Db, DocId, AttId} = fetch(data, Att),
+    Bin = fabric2_db:read_attachment(Db, DocId, AttId),
+    range_foldl(Bin, From, To, Fun, Acc).
 
 
-foldl_decode(Att, Fun, Acc) ->
-    case fetch([data, encoding], Att) of
-        [{stream, StreamEngine}, Enc] ->
-            couch_stream:foldl_decode(
-                    StreamEngine, fetch(md5, Att), Enc, Fun, Acc);
-        [Fun2, identity] ->
-            fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc)
-    end.
+foldl_decode(_Att, _Fun, _Acc) ->
+    erlang:error(not_supported).
 
 
 to_binary(Att) ->
@@ -677,10 +559,8 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
     Bin;
 to_binary(Iolist, _Att) when is_list(Iolist) ->
     iolist_to_binary(Iolist);
-to_binary({stream, _StreamEngine}, Att) ->
-    iolist_to_binary(
-        lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, []))
-    );
+to_binary({loc, Db, DocId, AttId}, _Att) ->
+    fabric2_db:read_attachmet(Db, DocId, AttId);
 to_binary(DataFun, Att) when is_function(DataFun)->
     Len = fetch(att_len, Att),
     iolist_to_binary(
@@ -695,46 +575,22 @@ to_binary(DataFun, Att) when is_function(DataFun)->
 
 fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
     Acc;
+
 fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
     Bin = RcvFun(),
     ResultAcc = Fun(Bin, Acc),
     fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
 
 
-%% Upgrade an attachment record to a property list on demand. This is a one-way
-%% operation as downgrading potentially truncates fields with important data.
--spec upgrade(#att{}) -> attachment().
-upgrade(#att{} = Att) ->
-    Map = lists:zip(
-        record_info(fields, att),
-        lists:seq(2, record_info(size, att))
-    ),
-    %% Don't store undefined elements since that is default
-    [{F, element(I, Att)} || {F, I} <- Map, element(I, Att) /= undefined];
-upgrade(Att) ->
-    Att.
-
-
-%% Downgrade is exposed for interactive convenience. In practice, unless done
-%% manually, upgrades are always one-way.
-downgrade(#att{} = Att) ->
-    Att;
-downgrade(Att) ->
-    #att{
-        name = fetch(name, Att),
-        type = fetch(type, Att),
-        att_len = fetch(att_len, Att),
-        disk_len = fetch(disk_len, Att),
-        md5 = fetch(md5, Att),
-        revpos = fetch(revpos, Att),
-        data = fetch(data, Att),
-        encoding = fetch(encoding, Att)
-    }.
-
-
-upgrade_encoding(true) -> gzip;
-upgrade_encoding(false) -> identity;
-upgrade_encoding(Encoding) -> Encoding.
+get_identity_md5(Bin, gzip) ->
+    Z = zlib:open(),
+    ok = zlib:inflateInit(Z, 16 + 15),
+    Inflated = zlib:inflate(Z, Bin),
+    ok = zlib:inflateEnd(Z),
+    ok = zlib:close(Z),
+    couch_hash:md5_hash(Inflated);
+get_identity_md5(Bin, _) ->
+    couch_hash:md5_hash(Bin).
 
 
 max_attachment_size() ->
@@ -753,18 +609,22 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) ->
     ok.
 
 
-open_stream(StreamSrc, Data) ->
-    case couch_db:is_db(StreamSrc) of
-        true ->
-            couch_db:open_read_stream(StreamSrc, Data);
-        false ->
-            case is_function(StreamSrc, 1) of
-                true ->
-                    StreamSrc(Data);
-                false ->
-                    erlang:error({invalid_stream_source, StreamSrc})
-            end
-    end.
+%% is_compressible(Type) when is_binary(Type) ->
+%%     is_compressible(binary_to_list(Type));
+%% is_compressible(Type) ->
+%%     TypeExpList = re:split(
+%%         config:get("attachments", "compressible_types", ""),
+%%         "\\s*,\\s*",
+%%         [{return, list}]
+%%     ),
+%%     lists:any(
+%%         fun(TypeExp) ->
+%%             Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+%%                 "(?:\\s*;.*?)?\\s*", $$],
+%%             re:run(Type, Regexp, [caseless]) =/= nomatch
+%%         end,
+%%         [T || T <- TypeExpList, T /= []]
+%%     ).
 
 
 -ifdef(TEST).
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index 4a49372..d33325e 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -374,6 +374,17 @@ rev_info({#doc{} = Doc, {Pos, [RevId | _]}}) ->
         body_sp = undefined,
         seq = undefined,
         rev = {Pos, RevId}
+    };
+rev_info({#{} = RevInfo, {Pos, [RevId | _]}}) ->
+    #{
+        deleted := Deleted,
+        sequence := Sequence
+    } = RevInfo,
+    #rev_info{
+        deleted = Deleted,
+        body_sp = undefined,
+        seq = Sequence,
+        rev = {Pos, RevId}
     }.
 
 is_deleted(#full_doc_info{rev_tree=Tree}) ->
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 44c290d..f73141d 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -99,13 +99,13 @@ db_open(#httpdb{} = Db1, _Options, Create, CreateParams) ->
                     _ ->
                         {ok, Db}
                 end;
-            (200, _, _Body) ->
+            (200, _H, _Body) ->
                  throw({db_not_found, ?l2b(db_uri(Db))});
             (401, _, _) ->
                 throw({unauthorized, ?l2b(db_uri(Db))});
             (403, _, _) ->
                 throw({forbidden, ?l2b(db_uri(Db))});
-            (_, _, _) ->
+            (_A, _B, _C) ->
                 throw({db_not_found, ?l2b(db_uri(Db))})
             end)
     catch
@@ -501,11 +501,12 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
         {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
     end,
+    Me = lists:flatten(io_lib:format("~p", [self()])),
     try
         send_req(
             HttpDb,
             [{method, Method}, {path, "_changes"}, {qs, QArgs},
-                {headers, Headers}, {body, Body},
+                {headers, Headers ++ [{"XKCD", Me}]}, {body, Body},
                 {ibrowse_options, [{stream_to, {self(), once}}]}],
             fun(200, _, DataStreamFun) ->
                     parse_changes_feed(Options, UserFun, DataStreamFun);
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl
index 2e4df53..9911f48 100644
--- a/src/couch_replicator/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl
@@ -98,6 +98,7 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) ->
             Stats = couch_replicator_stats:new([{doc_write_failures, 1}]),
             ok = gen_server:call(Parent, {add_stats, Stats}, infinity);
         false ->
+            couch_log:error("XKCD: REPL CHANGE: ~p", [DocInfo#doc_info.high_seq]),
             ok = couch_work_queue:queue(ChangesQueue, DocInfo),
             put(last_seq, DocInfo#doc_info.high_seq)
     end;
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 412ff7d..7786a7d 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -977,6 +977,7 @@ update_task(State) ->
         current_through_seq = {_, ThroughSeq},
         highest_seq_done = {_, HighestSeq}
     } = State,
+    couch_log:error("XKCD: UPDATE REPL TASK: ~p : ~p", [ThroughSeq, HighestSeq]),
     update_scheduler_job_stats(State),
     couch_task_status:update(
         rep_stats(State) ++ [
diff --git a/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl b/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl
index 5248469..7c3dc67 100644
--- a/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl
+++ b/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl
@@ -33,7 +33,7 @@ ddocid({_, DDocId}) ->
 
 
 recover({DbName, DDocId}) ->
-    fabric:open_doc(DbName, DDocId, [ejson_body, ?ADMIN_CTX]).
+    fabric2_db:open_doc(DbName, DDocId, [ejson_body, ?ADMIN_CTX]).
 
 
 insert({DbName, DDocId}, {ok, #doc{revs = Revs} = DDoc}) ->
diff --git a/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl b/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl
index 868fa77..38445af 100644
--- a/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl
+++ b/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl
@@ -34,7 +34,7 @@ ddocid({_, DDocId, _}) ->
 
 recover({DbName, DDocId, Rev}) ->
     Opts = [ejson_body, ?ADMIN_CTX],
-    {ok, [Resp]} = fabric:open_revs(DbName, DDocId, [Rev], Opts),
+    {ok, [Resp]} = fabric2_db:open_doc_revs(DbName, DDocId, [Rev], Opts),
     Resp.
 
 
diff --git a/src/fabric/src/fabric2.hrl b/src/fabric/src/fabric2.hrl
index e8d0b13..7ea0577 100644
--- a/src/fabric/src/fabric2.hrl
+++ b/src/fabric/src/fabric2.hrl
@@ -40,6 +40,7 @@
 -define(DB_REVS, 20).
 -define(DB_DOCS, 21).
 -define(DB_LOCAL_DOCS, 22).
+-define(DB_ATTS, 23).
 
 
 % Versions
@@ -54,3 +55,6 @@
 -define(PDICT_TX_ID_KEY, '$fabric_tx_id').
 -define(PDICT_TX_RES_KEY, '$fabric_tx_result').
 -define(COMMIT_UNKNOWN_RESULT, 1021).
+
+
+-define(ATTACHMENT_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index a9c17c9..230cec0 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -70,9 +70,9 @@
     open_doc/3,
     open_doc_revs/4,
     %% open_doc_int/3,
-    %% get_doc_info/2,
-    %% get_full_doc_info/2,
-    %% get_full_doc_infos/2,
+    get_doc_info/2,
+    get_full_doc_info/2,
+    get_full_doc_infos/2,
     get_missing_revs/2,
     %% get_design_doc/2,
     %% get_design_docs/1,
@@ -94,10 +94,8 @@
     %% purge_docs/2,
     %% purge_docs/3,
 
-    %% with_stream/3,
-    %% open_write_stream/2,
-    %% open_read_stream/2,
-    %% is_active_stream/2,
+    read_attachment/3,
+    write_attachment/3,
 
     fold_docs/3,
     fold_docs/4,
@@ -461,7 +459,43 @@ open_doc_revs(Db, DocId, Revs, Options) ->
     end).
 
 
-get_missing_revs(Db, IdRevs) ->
+get_doc_info(Db, DocId) ->
+    case get_full_doc_info(Db, DocId) of
+        not_found -> not_found;
+        FDI -> couch_doc:to_doc_info(FDI)
+    end.
+
+
+get_full_doc_info(Db, DocId) ->
+    RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        fabric2_fdb:get_all_revs(TxDb, DocId)
+    end),
+    if RevInfos == [] -> not_found; true ->
+        #{winner := true} = Winner = lists:last(RevInfos),
+        RevTree = lists:foldl(fun(RI, TreeAcc) ->
+            RIPath = fabric2_util:revinfo_to_path(RI),
+            {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
+            Merged
+        end, [], RevInfos),
+        #full_doc_info{
+            id = DocId,
+            update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)),
+            deleted = maps:get(deleted, Winner),
+            rev_tree = RevTree
+        }
+    end.
+
+
+get_full_doc_infos(Db, DocIds) ->
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:map(fun(DocId) ->
+            get_full_doc_info(TxDb, DocId)
+        end, DocIds)
+    end).
+
+
+get_missing_revs(Db, JsonIdRevs) ->
+    IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs],
     AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
         lists:foldl(fun({Id, _Revs}, Acc) ->
             case maps:is_key(Id, Acc) of
@@ -542,6 +576,20 @@ update_docs(Db, Docs, Options) ->
     {Status, Resps1}.
 
 
+read_attachment(Db, DocId, AttId) ->
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        fabric2_fdb:read_attachment(TxDb, DocId, AttId)
+    end).
+
+
+write_attachment(Db, DocId, Att) ->
+    Data = couch_att:fetch(data, Att),
+    {ok, AttId} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        fabric2_fdb:write_attachment(TxDb, DocId, Data)
+    end),
+    couch_att:store(data, {loc, Db, DocId, AttId}, Att).
+
+
 fold_docs(Db, UserFun, UserAcc) ->
     fold_docs(Db, UserFun, UserAcc, []).
 
@@ -718,8 +766,8 @@ apply_open_doc_opts(Doc, Revs, Options) ->
     end,
 
     Meta4 = if not IncludeLocalSeq -> []; true ->
-        #{winner := true, sequence := Seq} = lists:last(Revs),
-        [{local_seq, erlfdb_tuple:pack({Seq})}]
+        #{winner := true, sequence := SeqVS} = lists:last(Revs),
+        [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
     end,
 
     case Doc#doc.deleted and not ReturnDeleted of
@@ -907,6 +955,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
         revs = {NewRevPos, [NewRev | NewRevPath]}
     } = Doc3 = new_revid(Doc2),
 
+    Doc4 = update_attachment_revpos(Doc3),
+
     NewRevInfo = #{
         winner => undefined,
         deleted => NewDeleted,
@@ -918,9 +968,9 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
 
     % Gather the list of possible winnig revisions
     Possible = case Target == Winner of
-        true when not Doc3#doc.deleted ->
+        true when not Doc4#doc.deleted ->
             [NewRevInfo];
-        true when Doc3#doc.deleted ->
+        true when Doc4#doc.deleted ->
             case SecondPlace of
                 #{} -> [NewRevInfo, SecondPlace];
                 not_found -> [NewRevInfo]
@@ -945,7 +995,7 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
 
     ok = fabric2_fdb:write_doc(
             Db,
-            Doc3,
+            Doc4,
             NewWinner,
             Winner,
             ToUpdate,
@@ -1049,6 +1099,21 @@ update_local_doc(Db, Doc0, _Options) ->
     {ok, {0, integer_to_binary(Rev)}}.
 
 
+update_attachment_revpos(#doc{revs = {RevPos, _Revs}, atts = Atts0} = Doc) ->
+    Atts = lists:map(fun(Att) ->
+        case couch_att:fetch(data, Att) of
+            {loc, _Db, _DocId, _AttId} ->
+                % Attachment was already on disk
+                Att;
+            _ ->
+                % We will write this attachment with this update
+                % so mark it with the RevPos that will be written
+                couch_att:store(revpos, RevPos, Att)
+        end
+    end, Atts0),
+    Doc#doc{atts = Atts}.
+
+
 get_winning_rev_futures(Db, Docs) ->
     lists:foldl(fun(Doc, Acc) ->
         #doc{
@@ -1068,29 +1133,30 @@ get_winning_rev_futures(Db, Docs) ->
     end, #{}, Docs).
 
 
-prep_and_validate(Db, Doc, PrevRevInfo) ->
-    HasStubs = couch_doc:has_stubs(Doc),
+prep_and_validate(Db, NewDoc, PrevRevInfo) ->
+    HasStubs = couch_doc:has_stubs(NewDoc),
     HasVDUs = [] /= maps:get(validate_doc_update_funs, Db),
-    IsDDoc = case Doc#doc.id of
+    IsDDoc = case NewDoc#doc.id of
         <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
         _ -> false
     end,
 
     PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of
         true when PrevRevInfo /= not_found ->
-            case fabric2_fdb:get_doc_body(Db, Doc#doc.id, PrevRevInfo) of
-                #doc{} = Doc -> Doc;
+            case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of
+                #doc{} = PDoc -> PDoc;
                 {not_found, _} -> nil
             end;
         _ ->
             nil
     end,
 
-    MergedDoc = if not HasStubs -> Doc; true ->
+    MergedDoc = if not HasStubs -> NewDoc; true ->
         % This will throw an error if we have any
         % attachment stubs missing data
-        couch_doc:merge_stubs(Doc, PrevDoc)
+        couch_doc:merge_stubs(NewDoc, PrevDoc)
     end,
+    check_duplicate_attachments(MergedDoc),
     validate_doc_update(Db, MergedDoc, PrevDoc),
     MergedDoc.
 
@@ -1140,6 +1206,16 @@ validate_ddoc(Db, DDoc) ->
     end.
 
 
+check_duplicate_attachments(#doc{atts = Atts}) ->
+    lists:foldl(fun(Att, Names) ->
+        Name = couch_att:fetch(name, Att),
+        case ordsets:is_element(Name, Names) of
+            true -> throw({bad_request, <<"Duplicate attachments">>});
+            false -> ordsets:add_element(Name, Names)
+        end
+    end, ordsets:new(), Atts).
+
+
 get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
     LeafPath;
 get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
@@ -1187,3 +1263,20 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
 
 doc_tag(#doc{meta = Meta}) ->
     fabric2_util:get_value(ref, Meta).
+
+
+idrevs({Id, Revs}) when is_list(Revs) ->
+    {docid(Id), [rev(R) || R <- Revs]}.
+
+
+docid(DocId) when is_list(DocId) ->
+    list_to_binary(DocId);
+docid(DocId) ->
+    DocId.
+
+
+rev(Rev) when is_list(Rev); is_binary(Rev) ->
+    couch_doc:parse_rev(Rev);
+rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
+    Rev.
+
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl
new file mode 100644
index 0000000..a571714
--- /dev/null
+++ b/src/fabric/src/fabric2_events.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_events).
+
+
+-export([
+    link_listener/4,
+    stop_listener/1
+]).
+
+-export([
+    init/5,
+    poll/5
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+link_listener(Mod, Fun, St, Options) ->
+    DbName = fabric2_util:get_value(dbname, Options),
+    Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]),
+    receive
+        {Pid, initialized} -> ok
+    end,
+    {ok, Pid}.
+
+
+stop_listener(Pid) ->
+    Pid ! stop_listening.
+
+
+init(Parent, DbName, Mod, Fun, St) ->
+    {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+    Since = fabric2_db:get_update_seq(Db),
+    couch_log:error("XKCD: START LISTENER: ~s : ~p for ~p", [DbName, Since, Parent]),
+    erlang:monitor(process, Parent),
+    Parent ! {self(), initialized},
+    poll(DbName, Since, Mod, Fun, St),
+    couch_log:error("XKCD: STOP LISTENER for ~p", [Parent]).
+
+
+poll(DbName, Since, Mod, Fun, St) ->
+    {Resp, NewSince} = try
+        case fabric2_db:open(DbName, [?ADMIN_CTX]) of
+            {ok, Db} ->
+                case fabric2_db:get_update_seq(Db) of
+                    Since ->
+                        couch_log:error("XKCD: NO UPDATE: ~s :: ~p", [DbName, Since]),
+                        {{ok, St}, Since};
+                    Other ->
+                        couch_log:error("XKCD: UPDATED: ~s :: ~p -> ~p", [DbName, Since, Other]),
+                        {Mod:Fun(DbName, updated, St), Other}
+                end;
+            Error ->
+                exit(Error)
+        end
+    catch error:database_does_not_exist ->
+        Mod:Fun(DbName, deleted, St)
+    end,
+    receive
+        stop_listening ->
+            ok;
+        {'DOWN', _, _, _, _} ->
+            ok
+    after 0 ->
+        case Resp of
+            {ok, NewSt} ->
+                timer:sleep(1000),
+                ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt);
+            {stop, _} ->
+                ok
+        end
+    end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index d57ad3e..666a20b 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -45,10 +45,15 @@
     write_doc/6,
     write_local_doc/2,
 
+    read_attachment/3,
+    write_attachment/3,
+
     fold_docs/4,
     fold_changes/5,
     get_last_change/1,
 
+    vs_to_seq/1,
+
     debug_cluster/0,
     debug_cluster/2
 ]).
@@ -260,13 +265,12 @@ get_info(#{} = Db) ->
 
     RawSeq = case erlfdb:wait(ChangesFuture) of
         [] ->
-            fabric2_util:seq_zero();
+            vs_to_seq(fabric2_util:seq_zero_vs());
         [{SeqKey, _}] ->
             {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
-            <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({SeqVS}),
-            SeqBin
+            vs_to_seq(SeqVS)
     end,
-    CProp = {update_seq, fabric2_util:to_hex(RawSeq)},
+    CProp = {update_seq, RawSeq},
 
     MProps = lists:flatmap(fun({K, V}) ->
         case erlfdb_tuple:unpack(K, DbPrefix) of
@@ -565,13 +569,37 @@ write_local_doc(#{} = Db0, Doc) ->
     ok.
 
 
-write_doc_body(#{} = Db0, #doc{} = Doc) ->
+read_attachment(#{} = Db, DocId, AttId) ->
     #{
-        tx := Tx
-    } = Db = ensure_current(Db0),
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = ensure_current(Db),
 
-    {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
-    erlfdb:set(Tx, NewDocKey, NewDocVal).
+    AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+    case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
+        not_found ->
+            throw({not_found, missing});
+        KVs ->
+            Vs = [V || {_K, V} <- KVs],
+            iolist_to_binary(Vs)
+    end.
+
+
+write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = ensure_current(Db),
+
+    AttId = fabric2_util:uuid(),
+    Chunks = chunkify_attachment(Data),
+
+    lists:foldl(fun(Chunk, ChunkId) ->
+        AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
+        ok = erlfdb:set(Tx, AttKey, Chunk),
+        ChunkId + 1
+    end, 0, Chunks),
+    {ok, AttId}.
 
 
 fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
@@ -634,38 +662,21 @@ fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
     end,
 
     try
-        % We have to track this to return last_seq
-        <<51:8, FirstSeq:12/binary>> = erlfdb_tuple:pack({SinceSeq1}),
-        put('$last_changes_seq', fabric2_util:to_hex(FirstSeq)),
-
-        UserAcc1 = maybe_stop(UserFun(Db, start, UserAcc0)),
-
-        UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
-            {?DB_CHANGES, UpdateSeq} = erlfdb_tuple:unpack(K, DbPrefix),
+        {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
+            {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
             {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
 
-            % This comes back as a versionstamp so we have
-            % to pack it to get a binary.
-            <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({UpdateSeq}),
-            SeqHex = fabric2_util:to_hex(SeqBin),
-            put('$last_changes_seq', SeqHex),
+            Change = #{
+                id => DocId,
+                sequence => vs_to_seq(SeqVS),
+                rev_id => RevId,
+                deleted => Deleted
+            },
 
-            DelMember = if not Deleted -> []; true ->
-                [{deleted, true}]
-            end,
-
-            maybe_stop(UserFun(Db, {change, {[
-                {seq, SeqHex},
-                {id, DocId},
-                {changes, [{[{rev, couch_doc:rev_to_str(RevId)}]}]}
-            ] ++ DelMember}}, UserAccIn))
-        end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
-        UserFun(Db, {stop, get('$last_changes_seq'), null}, UserAcc2)
+            maybe_stop(UserFun(Change, UserAccIn))
+        end, UserAcc0, [{reverse, Reverse}] ++ Options)}
     catch throw:{stop, FinalUserAcc} ->
         {ok, FinalUserAcc}
-    after
-        erase('$last_changes_seq')
     end.
 
 
@@ -682,8 +693,7 @@ get_last_change(#{} = Db) ->
             fabric2_util:to_hex(fabric2_util:seq_zero());
         [{K, _V}] ->
             {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
-            <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({SeqVS}),
-            fabric2_util:to_hex(SeqBin)
+            vs_to_seq(SeqVS)
     end.
 
 
@@ -693,6 +703,11 @@ maybe_stop({stop, Acc}) ->
     throw({stop, Acc}).
 
 
+vs_to_seq(VS) ->
+    <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
+    fabric2_util:to_hex(SeqBin).
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
 
@@ -730,6 +745,15 @@ bump_metadata_version(Tx) ->
     erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).
 
 
+write_doc_body(#{} = Db0, #doc{} = Doc) ->
+    #{
+        tx := Tx
+    } = Db = ensure_current(Db0),
+
+    {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
+    erlfdb:set(Tx, NewDocKey, NewDocVal).
+
+
 revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
     #{
         deleted := Deleted,
@@ -793,7 +817,7 @@ doc_to_fdb(Db, #doc{} = Doc) ->
         body = Body,
         atts = Atts,
         deleted = Deleted
-    } = Doc,
+    } = doc_flush_atts(Db, Doc),
 
     Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev}, DbPrefix),
     Val = {Body, Atts, Deleted},
@@ -846,6 +870,24 @@ fdb_to_local_doc(_Db, _DocId, not_found) ->
     {not_found, missing}.
 
 
+doc_flush_atts(Db, Doc) ->
+    Atts = lists:map(fun(Att) ->
+        couch_att:flush(Db, Doc#doc.id, Att)
+    end, Doc#doc.atts),
+    Doc#doc{atts = Atts}.
+
+
+chunkify_attachment(Data) ->
+    case Data of
+        <<>> ->
+            [];
+        <<Head:?ATTACHMENT_CHUNK_SIZE/binary, Rest/binary>> ->
+            [Head | chunkify_attachment(Rest)];
+        <<_/binary>> when size(Data) < ?ATTACHMENT_CHUNK_SIZE ->
+            [Data]
+    end.
+
+
 get_dir_and_bounds(DbPrefix, Options) ->
     Reverse = case fabric2_util:get_value(dir, Options, fwd) of
         fwd -> false;
@@ -906,7 +948,7 @@ get_dir_and_bounds(DbPrefix, Options) ->
     {Reverse, StartKey4, EndKey4}.
 
 
-get_since_seq(Seq) when Seq == 0; Seq == <<"0">>; Seq == <<>> ->
+get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
     fabric2_util:seq_zero_vs();
 
 get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
@@ -916,7 +958,13 @@ get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
     Seq1 = fabric2_util:from_hex(Seq),
     Seq2 = <<51:8, Seq1/binary>>,
     {SeqVS} = erlfdb_tuple:unpack(Seq2),
-    SeqVS.
+    SeqVS;
+
+get_since_seq(List) when is_list(List) ->
+    get_since_seq(list_to_binary(List));
+
+get_since_seq(Seq) ->
+    erlang:error({invalid_since_seq, Seq}).
 
 
 get_db_handle() ->
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 1696f06..6e2df67 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -17,7 +17,6 @@
     revinfo_to_path/1,
     sort_revinfos/1,
 
-    seq_zero/0,
     seq_zero_vs/0,
     seq_max_vs/0,
 
@@ -66,10 +65,6 @@ rev_sort_key(#{} = RevInfo) ->
     {not Deleted, RevPos, Rev}.
 
 
-seq_zero() ->
-    <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>.
-
-
 seq_zero_vs() ->
     {versionstamp, 0, 0, 0}.
 
diff --git a/src/fabric/test/fabric2_doc_crud_tests.erl b/src/fabric/test/fabric2_doc_crud_tests.erl
index 17e8c36..85b2766 100644
--- a/src/fabric/test/fabric2_doc_crud_tests.erl
+++ b/src/fabric/test/fabric2_doc_crud_tests.erl
@@ -53,6 +53,7 @@ doc_crud_test_() ->
                 fun open_doc_revs_all/1,
                 fun open_doc_revs_latest/1,
                 fun get_missing_revs_basic/1,
+                fun get_missing_revs_on_missing_doc/1,
                 fun open_missing_local_doc/1,
                 fun create_local_doc_basic/1,
                 fun update_local_doc_basic/1,
@@ -615,6 +616,20 @@ get_missing_revs_basic({Db, _}) ->
         ).
 
 
+get_missing_revs_on_missing_doc({Db, _}) ->
+    Revs = lists:sort([
+            couch_doc:rev_to_str({1, fabric2_util:uuid()}),
+            couch_doc:rev_to_str({2, fabric2_util:uuid()}),
+            couch_doc:rev_to_str({800, fabric2_util:uuid()})
+        ]),
+    DocId = fabric2_util:uuid(),
+    {ok, Resp} = fabric2_db:get_missing_revs(Db, [{DocId, Revs}]),
+    ?assertMatch([{DocId, [_ | _], []}], Resp),
+    [{DocId, Missing, _}] = Resp,
+    MissingStrs = [couch_doc:rev_to_str(Rev) || Rev <- Missing],
+    ?assertEqual(Revs, lists:sort(MissingStrs)).
+
+
 open_missing_local_doc({Db, _}) ->
     ?assertEqual(
             {not_found, missing},