You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/07/24 18:53:22 UTC

[couchdb] 04/04: Views on ebtree

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-ebtree-views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9859427ecc5c4c0350b01f628c7e6e141557821a
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Jul 24 10:59:05 2020 -0500

    Views on ebtree
---
 src/couch_views/include/couch_views.hrl     |   5 +
 src/couch_views/src/couch_views_fdb.erl     | 510 +++++++++++++++-------------
 src/couch_views/src/couch_views_indexer.erl |  46 ++-
 src/couch_views/src/couch_views_reader.erl  | 111 +++---
 4 files changed, 341 insertions(+), 331 deletions(-)

diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 3d0110f..3882191 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -13,6 +13,7 @@
 % Index info/data subspaces
 -define(VIEW_INFO, 0).
 -define(VIEW_DATA, 1).
+-define(VIEW_TREES, 3).
 
 % Index info keys
 -define(VIEW_UPDATE_SEQ, 0).
@@ -25,6 +26,10 @@
 -define(VIEW_ID_RANGE, 0).
 -define(VIEW_MAP_RANGE, 1).
 
+% Tree keys
+-define(VIEW_ID_TREE, 0).
+-define(VIEW_ROW_TREES, 1).
+
 % jobs api
 -define(INDEX_JOB_TYPE, <<"views">>).
 
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index c957222..7fdea78 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -22,12 +22,14 @@
     get_update_seq/2,
     set_update_seq/3,
 
-    get_row_count/3,
-    get_kv_size/3,
+    set_trees/2,
 
-    fold_map_idx/6,
+    get_row_count/2,
+    get_kv_size/2,
 
-    write_doc/4,
+    fold_map_idx/5,
+
+    write_doc/3,
 
     list_signatures/1,
     clear_index/2
@@ -126,92 +128,175 @@ set_update_seq(TxDb, Sig, Seq) ->
     ok = erlfdb:set(Tx, seq_key(DbPrefix, Sig), Seq).
 
 
-get_row_count(TxDb, #mrst{sig = Sig}, ViewId) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    case erlfdb:wait(erlfdb:get(Tx, row_count_key(DbPrefix, Sig, ViewId))) of
-        not_found -> 0; % Can this happen?
-        CountBin -> ?bin2uint(CountBin)
-    end.
+set_trees(TxDb, Mrst0) ->
+    Mrst1 = open_id_tree(TxDb, Mrst0),
+    Views = lists:map(fun(View) ->
+        open_view_tree(TxDb, Mrst1#mrst.sig, View)
+    end, Mrst1#mrst.views),
+    Mrst1#mrst{
+        views = Views
+    }.
 
 
-get_kv_size(TxDb, #mrst{sig = Sig}, ViewId) ->
+get_row_count(TxDb, View) ->
     #{
-        tx := Tx,
-        db_prefix := DbPrefix
+        tx := Tx
     } = TxDb,
-
-    case erlfdb:wait(erlfdb:get(Tx, kv_size_key(DbPrefix, Sig, ViewId))) of
-        not_found -> 0; % Can this happen?
-        SizeBin -> ?bin2uint(SizeBin)
-    end.
+    {Count, _} = ebtree:full_reduce(Tx, View#mrview.btree),
+    Count.
 
 
-fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
+get_kv_size(TxDb, View) ->
     #{
-        db_prefix := DbPrefix
+        tx := Tx
     } = TxDb,
+    {_, TotalSize} = ebtree:full_reduce(Tx, View#mrview.btree),
+    TotalSize.
 
-    MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
-    FoldAcc = #{
-        prefix => MapIdxPrefix,
-        callback => Callback,
-        acc => Acc0
-        },
-    Fun = aegis:wrap_fold_fun(TxDb, fun fold_fwd/2),
 
+fold_map_idx(TxDb, View, Options, Callback, Acc0) ->
     #{
-        acc := Acc1
-    } = fabric2_fdb:fold_range(TxDb, MapIdxPrefix, Fun, FoldAcc, Options),
-
-    Acc1.
+        tx := Tx
+    } = TxDb,
+    #mrview{
+        btree = Btree
+    } = View,
+
+    CollateFun = collate_fun(View),
+
+    Dir = case lists:keyfind(dir, 1, Options) of
+        {dir, D} -> D;
+        _ -> fwd
+    end,
+
+    InclusiveEnd = case lists:keyfind(inclusive_end, 1, Options) of
+        {inclusive_end, IE} -> IE;
+        _ -> true
+    end,
+
+    StartKey = case lists:keyfind(start_key, 1, Options) of
+        {start_key, SK} -> SK;
+        false when Dir == fwd -> ebtree:min();
+        false when Dir == rev -> ebtree:max()
+    end,
+
+    EndKey = case lists:keyfind(end_key, 1, Options) of
+        {end_key, EK} -> EK;
+        false when Dir == fwd -> ebtree:max();
+        false when Dir == rev -> ebtree:min()
+    end,
+
+    Wrapper = fun(KVs0, WAcc) ->
+        % Remove any keys that match Start or End key
+        % depending on direction
+        KVs1 = case InclusiveEnd of
+            true ->
+                KVs0;
+            false when Dir == fwd ->
+                lists:filter(fun({K, _V}) ->
+                    case CollateFun(K, EndKey) of
+                        true ->
+                            % K =< EndKey
+                            case CollateFun(EndKey, K) of
+                                true ->
+                                    % K == EndKey, so reject
+                                    false;
+                                false ->
+                                    % K < EndKey, so include
+                                    true
+                            end;
+                        false when Dir == fwd ->
+                            % K > EndKey, should never happen, but reject
+                            false
+                    end
+                end, KVs0);
+            false when Dir == rev ->
+                lists:filter(fun({K, _V}) ->
+                    % In reverse, if K =< EndKey, we drop it
+                    not CollateFun(K, EndKey)
+                end, KVs0)
+        end,
+        % Expand dups
+        KVs2 = lists:flatmap(fun({K, V}) ->
+            case V of
+                {dups, Dups} when Dir == fwd ->
+                    [{K, D} || D <- Dups];
+                {dups, Dups} when Dir == rev ->
+                    [{K, D} || D <- lists:reverse(Dups)];
+                _ ->
+                    [{K, V}]
+            end
+        end, KVs1),
+        lists:foldl(fun({{Key, DocId}, Value}, WAccInner) ->
+            Callback(DocId, Key, Value, WAccInner)
+        end, WAcc, KVs2)
+    end,
+
+    case Dir of
+        fwd ->
+            ebtree:range(Tx, Btree, StartKey, EndKey, Wrapper, Acc0);
+        rev ->
+            % Start/End keys swapped on purpose because ebtree
+            ebtree:reverse_range(Tx, Btree, EndKey, StartKey, Wrapper, Acc0)
+    end.
 
 
-write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
+write_doc(TxDb, Mrst, #{deleted := true} = Doc) ->
+    #{
+        tx := Tx
+    } = TxDb,
     #{
         id := DocId
     } = Doc,
 
-    ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
+    ExistingViewKeys = get_view_keys(TxDb, Mrst, DocId),
 
-    clear_id_idx(TxDb, Sig, DocId),
-    lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
-        clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
-        update_row_count(TxDb, Sig, ViewId, -TotalKeys),
-        update_kv_size(TxDb, Sig, ViewId, -TotalSize)
-    end, ExistingViewKeys);
+    ebtree:delete(Tx, Mrst#mrst.id_btree, DocId),
+    lists:foreach(fun(#mrview{id_num = ViewId, btree = Btree}) ->
+        ViewKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
+            {ViewId, Keys} -> Keys;
+            false -> []
+        end,
+        lists:foreach(fun(Key) ->
+            ebtree:delete(Tx, Btree, {Key, DocId})
+        end, ViewKeys)
+    end, Mrst#mrst.views);
 
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Mrst, Doc) ->
+    #{
+        tx := Tx
+    } = TxDb,
     #{
         id := DocId,
-        results := Results,
-        kv_sizes := KVSizes
+        results := Results
     } = Doc,
 
-    ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
-
-    clear_id_idx(TxDb, Sig, DocId),
+    ExistingViewKeys = get_view_keys(TxDb, Mrst, DocId),
 
-    lists:foreach(fun({ViewId, NewRows, KVSize}) ->
-        update_id_idx(TxDb, Sig, ViewId, DocId, NewRows, KVSize),
+    NewIdKeys = lists:foldl(fun({View, RawNewRows}, IdKeyAcc) ->
+        #mrview{
+            id_num = ViewId
+        } = View,
 
+        % Remove old keys in the view
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
-            {ViewId, TotalRows, TotalSize, EKeys} ->
-                RowChange = length(NewRows) - TotalRows,
-                update_row_count(TxDb, Sig, ViewId, RowChange),
-                update_kv_size(TxDb, Sig, ViewId, KVSize - TotalSize),
-                EKeys;
-            false ->
-                RowChange = length(NewRows),
-                update_row_count(TxDb, Sig, ViewId, RowChange),
-                update_kv_size(TxDb, Sig, ViewId, KVSize),
-                []
+            {ViewId, Keys} -> Keys;
+            false -> []
         end,
-        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
-    end, lists:zip3(ViewIds, Results, KVSizes)).
+        lists:foreach(fun(K) ->
+            ebtree:delete(Tx, View#mrview.btree, {K, DocId})
+        end, ExistingKeys),
+
+        % Insert new rows
+        NewRows = dedupe_rows(View, RawNewRows),
+        lists:foreach(fun({K, V}) ->
+            ebtree:insert(Tx, View#mrview.btree, {K, DocId}, V)
+        end, NewRows),
+        ViewKeys = {View#mrview.id_num, lists:usort([K || {K, _V} <- NewRows])},
+        [ViewKeys | IdKeyAcc]
+    end, [], lists:zip(Mrst#mrst.views, Results)),
+
+    ebtree:insert(Tx, Mrst#mrst.id_btree, DocId, NewIdKeys).
 
 
 list_signatures(Db) ->
@@ -244,200 +329,152 @@ clear_index(Db, Signature) ->
     end, Keys),
 
     % Clear index data
-    RangeTuple = {?DB_VIEWS, ?VIEW_DATA, Signature},
-    RangePrefix = erlfdb_tuple:pack(RangeTuple, DbPrefix),
-    erlfdb:clear_range_startswith(Tx, RangePrefix).
-
-
-% For each row in a map view we store the the key/value
-% in FoundationDB:
-%
-%   `(EncodedSortKey, (EncodedKey, EncodedValue))`
-%
-% The difference between `EncodedSortKey` and `EndcodedKey` is
-% the use of `couch_util:get_sort_key/1` which turns UTF-8
-% strings into binaries that are byte comparable. Given a sort
-% key binary we cannot recover the input so to return unmodified
-% user data we are forced to store the original.
-
-fold_fwd({RowKey, PackedKeyValue}, Acc) ->
-    #{
-        prefix := Prefix,
-        callback := UserCallback,
-        acc := UserAcc0
-    } = Acc,
-
-    {{_SortKey, DocId}, _DupeId} =
-            erlfdb_tuple:unpack(RowKey, Prefix),
-
-    {EncodedOriginalKey, EncodedValue} = erlfdb_tuple:unpack(PackedKeyValue),
-    Value = couch_views_encoding:decode(EncodedValue),
-    Key = couch_views_encoding:decode(EncodedOriginalKey),
-
-    UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0),
+    DataTuple = {?DB_VIEWS, ?VIEW_DATA, Signature},
+    DataPrefix = erlfdb_tuple:pack(DataTuple, DbPrefix),
+    erlfdb:clear_range_startswith(Tx, DataPrefix),
 
-    Acc#{
-        acc := UserAcc1
-    }.
+    % Clear tree data
+    TreeTuple = {?DB_VIEWS, ?VIEW_TREES, Signature},
+    TreePrefix = erlfdb_tuple:pack(TreeTuple, DbPrefix),
+    erlfdb:clear_range_startswith(Tx, TreePrefix).
 
 
-clear_id_idx(TxDb, Sig, DocId) ->
+get_view_keys(TxDb, Mrst, DocId) ->
     #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    {Start, End} = id_idx_range(DbPrefix, Sig, DocId),
-    ok = erlfdb:clear_range(Tx, Start, End).
-
-
-clear_map_idx(TxDb, Sig, ViewId, DocId, ViewKeys) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    lists:foreach(fun(ViewKey) ->
-        {Start, End} = map_idx_range(DbPrefix, Sig, ViewId, ViewKey, DocId),
-        ok = erlfdb:clear_range(Tx, Start, End)
-    end, ViewKeys).
-
-
-update_id_idx(TxDb, Sig, ViewId, DocId, [], _KVSize) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-    Key = id_idx_key(DbPrefix, Sig, DocId, ViewId),
-    ok = erlfdb:clear(Tx, Key);
-
-update_id_idx(TxDb, Sig, ViewId, DocId, NewRows, KVSize) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    Unique = lists:usort([K || {K, _V} <- NewRows]),
-
-    Key = id_idx_key(DbPrefix, Sig, DocId, ViewId),
-    Val = couch_views_encoding:encode([length(NewRows), KVSize, Unique]),
-    ok = erlfdb:set(Tx, Key, aegis:encrypt(TxDb, Key, Val)).
-
-
-update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    lists:foreach(fun(RemKey) ->
-        {Start, End} = map_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
-        ok = erlfdb:clear_range(Tx, Start, End)
-    end, ExistingKeys),
-
-    KVsToAdd = process_rows(NewRows),
-    MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
-
-    lists:foreach(fun({DupeId, Key1, Key2, EV}) ->
-        KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId),
-        Val = erlfdb_tuple:pack({Key2, EV}),
-        ok = erlfdb:set(Tx, KK, aegis:encrypt(TxDb, KK, Val))
-    end, KVsToAdd).
-
-
-get_view_keys(TxDb, Sig, DocId) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
+        tx := Tx
     } = TxDb,
-    {Start, End} = id_idx_range(DbPrefix, Sig, DocId),
-    lists:map(fun({K, V}) ->
-        {?DB_VIEWS, ?VIEW_DATA, Sig, ?VIEW_ID_RANGE, DocId, ViewId} =
-                erlfdb_tuple:unpack(K, DbPrefix),
-        [TotalKeys, TotalSize, UniqueKeys] = couch_views_encoding:decode(V),
-        {ViewId, TotalKeys, TotalSize, UniqueKeys}
-    end, aegis:decrypt(TxDb, erlfdb:get_range(Tx, Start, End, []))).
+    #mrst{
+        id_btree = IdTree
+    } = Mrst,
+    case ebtree:lookup(Tx, IdTree, DocId) of
+        {DocId, ViewKeys} -> ViewKeys;
+        false -> []
+    end.
 
 
-update_row_count(TxDb, Sig, ViewId, Increment) ->
+open_id_tree(TxDb, #mrst{sig = Sig} = Mrst) ->
     #{
         tx := Tx,
         db_prefix := DbPrefix
     } = TxDb,
-    Key = row_count_key(DbPrefix, Sig, ViewId),
-    erlfdb:add(Tx, Key, Increment).
+    Prefix = id_tree_prefix(DbPrefix, Sig),
+    Mrst#mrst{
+        id_btree = ebtree:open(Tx, Prefix, 10, [])
+    }.
 
 
-update_kv_size(TxDb, Sig, ViewId, Increment) ->
+open_view_tree(TxDb, Sig, View) ->
     #{
         tx := Tx,
         db_prefix := DbPrefix
     } = TxDb,
+    #mrview{
+        id_num = ViewId
+    } = View,
+    Prefix = view_tree_prefix(DbPrefix, Sig, ViewId),
+    TreeOpts = [
+        {collate_fun, collate_fun(View)},
+        {reduce_fun, make_reduce_fun(View)}
+    ],
+    View#mrview{
+        btree = ebtree:open(Tx, Prefix, 10, TreeOpts)
+    }.
 
-    % Track a view specific size for calls to
-    % GET /dbname/_design/doc/_info`
-    IdxKey = kv_size_key(DbPrefix, Sig, ViewId),
-    erlfdb:add(Tx, IdxKey, Increment),
 
-    % Track a database level rollup for calls to
-    % GET /dbname
-    DbKey = db_kv_size_key(DbPrefix),
-    erlfdb:add(Tx, DbKey, Increment).
+collate_fun(View) ->
+    #mrview{
+        options = Options
+    } = View,
+    case couch_util:get_value(<<"collation">>, Options) of
+        <<"raw">> -> fun erlang:'=<'/2;
+        _ -> fun couch_ejson_compare:lteq_json_ids/2
+    end.
 
 
-seq_key(DbPrefix, Sig) ->
-    Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_UPDATE_SEQ, Sig},
-    erlfdb_tuple:pack(Key, DbPrefix).
+make_reduce_fun(#mrview{}) ->
+    fun
+        (KVs, _ReReduce = false) ->
+            TotalSize = lists:foldl(fun({K, V}, Acc) ->
+                KSize = couch_ejson_size:encoded_size(K),
+                VSize = case V of
+                    {dups, Dups} ->
+                        lists:foldl(fun(D, DAcc) ->
+                            DAcc + couch_ejson_size:encoded_size(D)
+                        end, 0, Dups);
+                    _ ->
+                        couch_ejson_size:encoded_size(V)
+                end,
+                KSize + VSize + Acc
+            end, 0, KVs),
+            {length(KVs), TotalSize};
+        (KRs, _ReReduce = true) ->
+            lists:foldl(fun({Count, Size}, {CountAcc, SizeAcc}) ->
+                {Count + CountAcc, Size + SizeAcc}
+            end, {0, 0}, KRs)
+    end.
 
 
-row_count_key(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_ROW_COUNT, Sig, ViewId},
-    erlfdb_tuple:pack(Key, DbPrefix).
+dedupe_rows(View, KVs0) ->
+    CollateFun = collate_fun(View),
+    KVs1 = [{{K, <<>>}, V} || {K, V} <- KVs0],
+    KVs2 = lists:sort(CollateFun, KVs1),
+    KVs3 = dedupe_rows_int(CollateFun, KVs2),
+    [{K, V} || {{K, _FakeDocId}, V} <- KVs3].
+
+
+dedupe_rows_int(_CollateFun, []) ->
+    [];
+
+dedupe_rows_int(_CollateFun, [KV]) ->
+    [KV];
+
+dedupe_rows_int(CollateFun, [{K1, V1} | RestKVs]) ->
+    RestDeduped = dedupe_rows_int(CollateFun, RestKVs),
+    case RestDeduped of
+        [{K2, V2} | RestRestDeduped] ->
+            Equal = case CollateFun(K1, K2) of
+                true ->
+                    case CollateFun(K2, K1) of
+                        true ->
+                            true;
+                        false ->
+                            false
+                    end;
+                false ->
+                    false
+            end,
+            case Equal of
+                true ->
+                    [{K1, combine_vals(V1, V2)} | RestRestDeduped];
+                false ->
+                    [{K1, V1} | RestDeduped]
+            end;
+        [] ->
+            [{K1, V1}]
+    end.
 
 
-kv_size_key(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_KV_SIZE, Sig, ViewId},
-    erlfdb_tuple:pack(Key, DbPrefix).
+combine_vals(V1, {dups, V2}) ->
+    {dups, [V1 | V2]};
+combine_vals(V1, V2) ->
+    {dups, [V1, V2]}.
 
 
-db_kv_size_key(DbPrefix) ->
-    Key = {?DB_STATS, <<"sizes">>, <<"views">>},
+id_tree_prefix(DbPrefix, Sig) ->
+    Key = {?DB_VIEWS, ?VIEW_TREES, Sig, ?VIEW_ID_TREE},
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
-id_idx_key(DbPrefix, Sig, DocId, ViewId) ->
-    Key = {?DB_VIEWS, ?VIEW_DATA, Sig, ?VIEW_ID_RANGE, DocId, ViewId},
+view_tree_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, ?VIEW_TREES, Sig, ?VIEW_ROW_TREES, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
-id_idx_range(DbPrefix, Sig, DocId) ->
-    Key = {?DB_VIEWS, ?VIEW_DATA, Sig, ?VIEW_ID_RANGE, DocId},
-    erlfdb_tuple:range(Key, DbPrefix).
-
-
-map_idx_prefix(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIEWS, ?VIEW_DATA, Sig, ?VIEW_MAP_RANGE, ViewId},
+seq_key(DbPrefix, Sig) ->
+    Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_UPDATE_SEQ, Sig},
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
-map_idx_key(MapIdxPrefix, MapKey, DupeId) ->
-    Key = {MapKey, DupeId},
-    erlfdb_tuple:pack(Key, MapIdxPrefix).
-
-
-map_idx_range(DbPrefix, Sig, ViewId, MapKey, DocId) ->
-    Encoded = couch_views_encoding:encode(MapKey, key),
-    Key = {
-        ?DB_VIEWS,
-        ?VIEW_DATA,
-        Sig,
-        ?VIEW_MAP_RANGE,
-        ViewId,
-        {Encoded, DocId}
-    },
-    erlfdb_tuple:range(Key, DbPrefix).
-
-
 creation_vs_key(Db, Sig) ->
     #{
         db_prefix := DbPrefix
@@ -454,22 +491,15 @@ build_status_key(Db, Sig) ->
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
-process_rows(Rows) ->
-    Encoded = lists:map(fun({K, V}) ->
-        EK1 = couch_views_encoding:encode(K, key),
-        EK2 = couch_views_encoding:encode(K, value),
-        EV = couch_views_encoding:encode(V, value),
-        {EK1, EK2, EV}
-    end, Rows),
-
-    Grouped = lists:foldl(fun({K1, K2, V}, Acc) ->
-        dict:append(K1, {K2, V}, Acc)
-    end, dict:new(), Encoded),
-
-    dict:fold(fun(K1, Vals, DAcc) ->
-        Vals1 = lists:keysort(2, Vals),
-        {_, Labeled} = lists:foldl(fun({K2, V}, {Count, Acc}) ->
-            {Count + 1, [{Count, K1, K2, V} | Acc]}
-        end, {0, []}, Vals1),
-        Labeled ++ DAcc
-    end, [], Grouped).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+dedupe_basic_test() ->
+    View = #mrview{},
+    ?assertEqual([{1, 1}], dedupe_rows(View, [{1, 1}])).
+
+dedupe_simple_test() ->
+    View = #mrview{},
+    ?assertEqual([{1, {dups, [1, 2]}}], dedupe_rows(View, [{1, 1}, {1, 2}])).
+
+-endif.
\ No newline at end of file
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 31868d9..4f8e405 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -102,7 +102,8 @@ init() ->
             ok;
         error:database_does_not_exist ->
             fail_job(Job, Data, db_deleted, "Database was deleted");
-        Error:Reason  ->
+        Error:Reason:Stack  ->
+            couch_log:error("~p : ~p~n~p~n", [Error, Reason, Stack]),
             couch_rate:failure(Limiter),
             NewRetry = Retries + 1,
             RetryLimit = retry_limit(),
@@ -181,6 +182,7 @@ update(#{} = Db, Mrst0, State0) ->
 
 do_update(Db, Mrst0, State0) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
+        Mrst1 = couch_views_fdb:set_trees(TxDb, Mrst0),
         State1 = get_update_start_state(TxDb, Mrst0, State0),
 
         {ok, State2} = fold_changes(State1),
@@ -196,20 +198,20 @@ do_update(Db, Mrst0, State0) ->
         DocAcc1 = fetch_docs(TxDb, DocAcc),
         couch_rate:in(Limiter, Count),
 
-        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
-        WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2),
+        {Mrst2, MappedDocs} = map_docs(Mrst1, DocAcc1),
+        WrittenDocs = write_docs(TxDb, Mrst2, MappedDocs, State2),
 
         couch_rate:success(Limiter, WrittenDocs),
 
         case Count < Limit of
             true ->
-                maybe_set_build_status(TxDb, Mrst1, ViewVS,
+                maybe_set_build_status(TxDb, Mrst2, ViewVS,
                     ?INDEX_READY),
                 report_progress(State2, finished),
-                {Mrst1, finished};
+                {Mrst2, finished};
             false ->
                 State3 = report_progress(State2, update),
-                {Mrst1, State3#{
+                {Mrst2, State3#{
                     tx_db := undefined,
                     count := 0,
                     doc_acc := [],
@@ -347,7 +349,6 @@ map_docs(Mrst, Docs) ->
 
 write_docs(TxDb, Mrst, Docs, State) ->
     #mrst{
-        views = Views,
         sig = Sig
     } = Mrst,
 
@@ -355,20 +356,19 @@ write_docs(TxDb, Mrst, Docs, State) ->
         last_seq := LastSeq
     } = State,
 
-    ViewIds = [View#mrview.id_num || View <- Views],
     KeyLimit = key_size_limit(),
     ValLimit = value_size_limit(),
 
-    DocsNumber = lists:foldl(fun(Doc0, N) ->
-        Doc1 = calculate_kv_sizes(Mrst, Doc0, KeyLimit, ValLimit),
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1),
-        N + 1
-    end, 0, Docs),
+    lists:foreach(fun(Doc0) ->
+        Doc1 = check_kv_size_limit(Mrst, Doc0, KeyLimit, ValLimit),
+        couch_views_fdb:write_doc(TxDb, Mrst, Doc1)
+    end, Docs),
 
     if LastSeq == false -> ok; true ->
         couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
     end,
-    DocsNumber.
+
+    length(Docs).
 
 
 fetch_docs(Db, Changes) ->
@@ -443,7 +443,7 @@ start_query_server(#mrst{} = Mrst) ->
     Mrst.
 
 
-calculate_kv_sizes(Mrst, Doc, KeyLimit, ValLimit) ->
+check_kv_size_limit(Mrst, Doc, KeyLimit, ValLimit) ->
     #mrst{
         db_name = DbName,
         idx_name = IdxName
@@ -452,10 +452,10 @@ calculate_kv_sizes(Mrst, Doc, KeyLimit, ValLimit) ->
         results := Results
     } = Doc,
     try
-        KVSizes = lists:map(fun(ViewRows) ->
-            lists:foldl(fun({K, V}, Acc) ->
-                KeySize = erlang:external_size(K),
-                ValSize = erlang:external_size(V),
+        lists:foreach(fun(ViewRows) ->
+            lists:foreach(fun({K, V}) ->
+                KeySize = couch_ejson_size:encoded_size(K),
+                ValSize = couch_ejson_size:encoded_size(V),
 
                 if KeySize =< KeyLimit -> ok; true ->
                     throw({size_error, key})
@@ -463,12 +463,10 @@ calculate_kv_sizes(Mrst, Doc, KeyLimit, ValLimit) ->
 
                 if ValSize =< ValLimit -> ok; true ->
                     throw({size_error, value})
-                end,
-
-                Acc + KeySize + ValSize
-            end, 0, ViewRows)
+                end
+            end, ViewRows)
         end, Results),
-        Doc#{kv_sizes => KVSizes}
+        Doc
     catch throw:{size_error, Type} ->
         #{id := DocId} = Doc,
         Fmt = "View ~s size error for docid `~s`, excluded from indexing "
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index ce7f163..545b91a 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -23,24 +23,24 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
-read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
-    #mrst{
-        language = Lang,
-        sig = Sig,
-        views = Views
-    } = Mrst,
-
-    ViewId = get_view_id(Lang, Args, ViewName, Views),
-    Fun = fun handle_row/4,
-
+read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
     try
         fabric2_fdb:transactional(Db, fun(TxDb) ->
-            Meta = get_meta(TxDb, Mrst, ViewId, Args),
+            #mrst{
+                language = Lang,
+                views = Views
+            } = Mrst = couch_views_fdb:set_trees(TxDb, Mrst0),
+
+            View = get_view(Lang, Args, ViewName, Views),
+            Fun = fun handle_row/4,
+
+            Meta = get_meta(TxDb, Mrst, View, Args),
             UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
 
             Acc0 = #{
                 db => TxDb,
                 skip => Args#mrargs.skip,
+                limit => Args#mrargs.limit,
                 mrargs => undefined,
                 callback => UserCallback,
                 acc => UserAcc1
@@ -51,14 +51,7 @@ read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
                 KeyAcc1 = KeyAcc0#{
                     mrargs := KeyArgs
                 },
-                couch_views_fdb:fold_map_idx(
-                        TxDb,
-                        Sig,
-                        ViewId,
-                        Opts,
-                        Fun,
-                        KeyAcc1
-                    )
+                couch_views_fdb:fold_map_idx(TxDb, View, Opts, Fun, KeyAcc1)
             end, Acc0, expand_keys_args(Args)),
 
             #{
@@ -66,27 +59,35 @@ read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
             } = Acc1,
             {ok, maybe_stop(UserCallback(complete, UserAcc2))}
         end)
-    catch throw:{done, Out} ->
-        {ok, Out}
+    catch
+        throw:{complete, Out} ->
+            {_, Final} = UserCallback(complete, Out),
+            {ok, Final};
+        throw:{done, Out} ->
+            {ok, Out}
     end.
 
 
-get_meta(TxDb, Mrst, ViewId, #mrargs{update_seq = true}) ->
-    TotalRows = couch_views_fdb:get_row_count(TxDb, Mrst, ViewId),
+get_meta(TxDb, Mrst, View, #mrargs{update_seq = true}) ->
+    TotalRows = couch_views_fdb:get_row_count(TxDb, View),
     ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
     {meta,  [{update_seq, ViewSeq}, {total, TotalRows}, {offset, null}]};
 
-get_meta(TxDb, Mrst, ViewId, #mrargs{}) ->
-    TotalRows = couch_views_fdb:get_row_count(TxDb, Mrst, ViewId),
+get_meta(TxDb, _Mrst, View, #mrargs{}) ->
+    TotalRows = couch_views_fdb:get_row_count(TxDb, View),
     {meta, [{total, TotalRows}, {offset, null}]}.
 
 
 handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
     Acc#{skip := Skip - 1};
 
+handle_row(_DocID, _Key, _Value, #{limit := 0, acc := UserAcc}) ->
+    throw({complete, UserAcc});
+
 handle_row(DocId, Key, Value, Acc) ->
     #{
         db := TxDb,
+        limit := Limit,
         mrargs := Args,
         callback := UserCallback,
         acc := UserAcc0
@@ -111,13 +112,13 @@ handle_row(DocId, Key, Value, Acc) ->
     end,
 
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
-    Acc#{acc := UserAcc1}.
+    Acc#{limit := Limit - 1, acc := UserAcc1}.
 
 
-get_view_id(Lang, Args, ViewName, Views) ->
+get_view(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
-        {map, View, _Args} -> View#mrview.id_num;
-        {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+        {map, View, _Args} -> View;
+        {red, {_Idx, _Lang, View}} -> View
     end.
 
 
@@ -135,57 +136,33 @@ expand_keys_args(#mrargs{keys = Keys} = Args) ->
 
 mrargs_to_fdb_options(Args) ->
     #mrargs{
-        start_key = StartKey0,
+        start_key = StartKey,
         start_key_docid = StartKeyDocId,
-        end_key = EndKey0,
-        end_key_docid = EndKeyDocId,
+        end_key = EndKey,
+        end_key_docid = EndKeyDocId0,
         direction = Direction,
-        limit = Limit,
-        skip = Skip,
         inclusive_end = InclusiveEnd
     } = Args,
 
-    StartKey1 = if StartKey0 == undefined -> undefined; true ->
-        couch_views_encoding:encode(StartKey0, key)
-    end,
-
-    StartKeyOpts = case {StartKey1, StartKeyDocId} of
-        {undefined, _} ->
-            [];
-        {StartKey1, StartKeyDocId} ->
-            [{start_key, {StartKey1, StartKeyDocId}}]
+    StartKeyOpts = if StartKey == undefined -> []; true ->
+        [{start_key, {StartKey, StartKeyDocId}}]
     end,
 
-    EndKey1 = if EndKey0 == undefined -> undefined; true ->
-        couch_views_encoding:encode(EndKey0, key)
+    EndKeyDocId = case {Direction, EndKeyDocId0} of
+        {fwd, <<255>>} when InclusiveEnd -> <<255>>;
+        {fwd, <<255>>} when not InclusiveEnd -> <<>>;
+        {rev, <<>>} when InclusiveEnd -> <<>>;
+        {rev, <<>>} when not InclusiveEnd -> <<255>>;
+        _ -> EndKeyDocId0
     end,
 
-    EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
-        {undefined, _, _} ->
-            [];
-        {EndKey1, <<>>, rev} when not InclusiveEnd ->
-            % When we iterate in reverse with
-            % inclusive_end=false we have to set the
-            % EndKeyDocId to <<255>> so that we don't
-            % include matching rows.
-            [{end_key_gt, {EndKey1, <<255>>}}];
-        {EndKey1, <<255>>, _} when not InclusiveEnd ->
-            % When inclusive_end=false we need to
-            % elide the default end_key_docid so as
-            % to not sort past the docids with the
-            % given end key.
-            [{end_key_gt, {EndKey1}}];
-        {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
-            [{end_key_gt, {EndKey1, EndKeyDocId}}];
-        {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
-            [{end_key, {EndKey1, EndKeyDocId}}]
+    EndKeyOpts = if EndKey == undefined -> []; true ->
+        [{end_key, {EndKey, EndKeyDocId}}]
     end,
 
     [
         {dir, Direction},
-        {limit, Limit + Skip},
-        {streaming_mode, want_all},
-        {restart_tx, true}
+        {inclusive_end, InclusiveEnd}
     ] ++ StartKeyOpts ++ EndKeyOpts.