You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:00 UTC

[couchdb] 01/05: Initial work

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 03cbc41c15a3e239e4c5f4e820d19593cc2b8dc4
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200

    Initial work
---
 src/couch_views/include/couch_views.hrl            |   1 +
 src/couch_views/src/couch_views.erl                |  14 +-
 src/couch_views/src/couch_views_fdb.erl            | 116 ++++++-
 src/couch_views/src/couch_views_indexer.erl        |   5 +
 src/couch_views/src/couch_views_reader.erl         | 155 ++++++++-
 src/couch_views/src/couch_views_reduce.erl         | 364 +++++++++++++++++++++
 .../couch_views_reduce_fdb.erl}                    |  29 +-
 .../test/exunit/couch_views_reduce_test.exs        | 286 ++++++++++++++++
 src/couch_views/test/exunit/test_helper.exs        |   2 +
 9 files changed, 950 insertions(+), 22 deletions(-)

diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..e97d777 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
 -define(VIEW_ID_INFO, 1).
 -define(VIEW_ID_RANGE, 2).
 -define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
 
 -define(VIEW_ROW_COUNT, 0).
 -define(VIEW_KV_SIZE, 1).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 7c7588c..b7fe4c4 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
     Args1 = to_mrargs(Args0),
     Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
     Args3 = couch_mrview_util:validate_args(Args2),
+
     ok = check_range(Args3),
-    case is_reduce_view(Args3) of
-        true -> throw({not_implemented});
-        false -> ok
-    end,
 
     ok = maybe_update_view(Db, Mrst, Args3),
 
     try
-        couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args3)
+        case is_reduce_view(Args3) of
+            true ->
+                couch_views_reader:read_reduce(Db, Mrst, ViewName,
+                    Callback, Acc0, Args3);
+            false ->
+                couch_views_reader:read(Db, Mrst, ViewName,
+                    Callback, Acc0, Args3)
+        end
     after
         UpdateAfter = Args3#mrargs.update == lazy,
         if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..07241dd 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
     get_kv_size/3,
 
     fold_map_idx/6,
+    fold_reduce_idx/6,
 
     write_doc/4
 ]).
@@ -42,6 +43,15 @@
 % View Build Sequence Access
 % (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
 
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+%   = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+%   = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
 
 get_update_seq(TxDb, #mrst{sig = Sig}) ->
     #{
@@ -124,6 +134,8 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     Acc1.
 
 
+
+
 write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     #{
         id := DocId
@@ -134,6 +146,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     clear_id_idx(TxDb, Sig, DocId),
     lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
         clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+        %clear_reduce_idx
         update_row_count(TxDb, Sig, ViewId, -TotalKeys),
         update_kv_size(TxDb, Sig, ViewId, -TotalSize)
     end, ExistingViewKeys);
@@ -141,14 +154,17 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
 write_doc(TxDb, Sig, ViewIds, Doc) ->
     #{
         id := DocId,
-        results := Results
+        results := Results,
+        reduce_results := ReduceResults
     } = Doc,
 
     ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
 
     clear_id_idx(TxDb, Sig, DocId),
 
-    lists:foreach(fun({ViewId, NewRows}) ->
+    %% TODO: handle when there is no reduce
+    io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
+    lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +181,11 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 update_kv_size(TxDb, Sig, ViewId, SizeChange),
                 []
         end,
-        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
-    end, lists:zip(ViewIds, Results)).
+        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+        couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+            ExistingKeys, ReduceResult),
+        update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+    end, lists:zip3(ViewIds, Results, ReduceResults)).
 
 
 % For each row in a map view there are two rows stored in
@@ -338,6 +357,53 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
     end, KVsToAdd).
 
 
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+%%    Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%%    KeysToRem = ExistingKeys -- Unique,
+%%    lists:foreach(fun(RemKey) ->
+%%        {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%%        ok = erlfdb:clear_range(Tx, Start, End)
+%%    end, KeysToRem),
+%%
+    {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+    ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+    add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+    add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+    lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+        KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+            ReduceType, ?VIEW_ROW_KEY),
+        VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+            ReduceType, ?VIEW_ROW_VALUE),
+        ok = erlfdb:set(Tx, KK, Key2),
+        ok = erlfdb:add(Tx, VK, Val)
+    end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+    Key = {ReduceKey, GroupLevel, ReduceType, RowType},
+    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%%    Encoded = couch_views_encoding:encode(MapKey, key),
+%%    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%%    erlfdb_tuple:range(Key, DbPrefix).
+
+
 get_view_keys(TxDb, Sig, DocId) ->
     #{
         tx := Tx,
@@ -394,7 +460,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
     erlfdb_tuple:range(Key, DbPrefix).
 
-
 map_idx_prefix(DbPrefix, Sig, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +497,47 @@ process_rows(Rows) ->
     end, [], Grouped).
 
 
+process_reduce_rows(Rows) ->
+    ReduceExact = encode_reduce_rows(Rows),
+    ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+        Out = create_grouping(Key, Val, [], Groupings),
+        Out
+    end, #{}, Rows),
+    ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+    {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+    lists:map(fun({K, V}) ->
+        EK1 = couch_views_encoding:encode(K, key),
+        EK2 = couch_views_encoding:encode(K, value),
+        {EK1, EK2, V, group_level(K)}
+    end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+    length(Key);
+
+group_level(_Key) ->
+    1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+    Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+    Key1 = Key ++ [Head],
+    Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+        OldVal + Val
+        end, Val, Groupings),
+    create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+    maps:update_with(Key, fun(OldVal) ->
+        OldVal + Val
+    end, Val, Groupings).
+
+
 calculate_row_size(Rows) ->
     lists:foldl(fun({K, V}, Acc) ->
         Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce063..2f38ac1 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -101,6 +101,7 @@ update(#{} = Db, Mrst0, State0) ->
 
         DocAcc1 = fetch_docs(TxDb, DocAcc),
         {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
+        Results = run_reduce(Mrst1, MappedDocs),
         write_docs(TxDb, Mrst1, MappedDocs, State2),
 
         case Count < Limit of
@@ -209,6 +210,10 @@ map_docs(Mrst, Docs) ->
     {Mrst1, MappedDocs}.
 
 
+run_reduce(Mrst, MappedResults) ->
+    couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
 write_docs(TxDb, Mrst, Docs, State) ->
     #mrst{
         views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..d08515c 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
 -module(couch_views_reader).
 
 -export([
+    read_reduce/6,
     read/6
 ]).
 
@@ -23,6 +24,128 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+    #mrst{
+        language = Lang,
+        sig = Sig,
+        views = Views
+    } = Mrst,
+
+    ViewId = get_view_id(Lang, Args, ViewName, Views),
+    couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+        UserAcc0, Args).
+%%    Fun = fun handle_reduce_row/3,
+%%
+%%    try
+%%        fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%            Meta = get_meta(TxDb, Mrst, ViewId, Args),
+%%            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+%%
+%%            #mrargs{
+%%                limit = Limit
+%%            } = Args,
+%%
+%%            Acc0 = #{
+%%                db => TxDb,
+%%                skip => Args#mrargs.skip,
+%%                mrargs => undefined,
+%%                callback => UserCallback,
+%%                acc => UserAcc1,
+%%                row_count => 0,
+%%                limit => Limit
+%%            },
+%%
+%%            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+%%                Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+%%                KeyAcc1 = KeyAcc0#{
+%%                    mrargs := KeyArgs
+%%                },
+%%                couch_views_fdb:fold_reduce_idx(
+%%                    TxDb,
+%%                    Sig,
+%%                    ViewId,
+%%                    Opts,
+%%                    Fun,
+%%                    KeyAcc1
+%%                )
+%%            end, Acc0, expand_keys_args(Args)),
+%%
+%%            #{
+%%                acc := UserAcc2
+%%            } = Acc1,
+%%            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+%%        end)
+%%    catch throw:{done, Out} ->
+%%        {ok, Out}
+%%    end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+    #mrargs{
+%%        start_key = StartKey0,
+%%        start_key_docid = StartKeyDocId,
+%%        end_key = EndKey0,
+%%        end_key_docid = EndKeyDocId,
+        direction = Direction,
+        limit = Limit,
+        skip = Skip,
+        group_level = GroupLevel,
+        group = Group
+%%        inclusive_end = InclusiveEnd
+    } = Args,
+
+    GroupExact = Group == true andalso GroupLevel == 0,
+
+    GroupLevelEnd = case GroupExact of
+        true -> [];
+        false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+    end,
+
+%%    StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%%        couch_views_encoding:encode(StartKey0, key)
+%%    end,
+%%
+%%    StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%%        {undefined, _} ->
+%%            [];
+%%        {StartKey1, StartKeyDocId} ->
+%%            [{start_key, {StartKey1, StartKeyDocId}}]
+%%    end,
+%%
+%%    EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%%        couch_views_encoding:encode(EndKey0, key)
+%%    end,
+%%
+%%    EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%%        {undefined, _, _} ->
+%%            [];
+%%        {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%%            % When we iterate in reverse with
+%%            % inclusive_end=false we have to set the
+%%            % EndKeyDocId to <<255>> so that we don't
+%%            % include matching rows.
+%%            [{end_key_gt, {EndKey1, <<255>>}}];
+%%        {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%%            % When inclusive_end=false we need to
+%%            % elide the default end_key_docid so as
+%%            % to not sort past the docids with the
+%%            % given end key.
+%%            [{end_key_gt, {EndKey1}}];
+%%        {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%%            [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%%        {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%%            [{end_key, {EndKey1, EndKeyDocId}}]
+%%    end,
+
+    [
+        {dir, Direction},
+%%        {limit, Limit * 2 + Skip * 2},
+        {streaming_mode, large}
+%%        {streaming_mode, want_all}
+    ] ++ GroupLevelEnd.
+%%    ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
 read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     #mrst{
         language = Lang,
@@ -113,11 +236,41 @@ handle_row(DocId, Key, Value, Acc) ->
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
     Acc#{acc := UserAcc1}.
 
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+    Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+    io:format("ACC ~p ~n", [Acc]),
+    #{
+        callback := UserCallback,
+        acc := UserAcc0,
+        row_count := RowCount,
+        limit := Limit
+    } = Acc,
+
+    Row = [
+        {key, Key},
+        {value, Value}
+    ],
+
+    RowCountNext = RowCount + 1,
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+    case RowCountNext == Limit of
+        true ->
+            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+            maybe_stop({stop, UserAcc2});
+        false ->
+            Acc1
+    end.
+
 
 get_view_id(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
         {map, View, _Args} -> View#mrview.id_num;
-        {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+        {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
     end.
 
 
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..1502f38
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,364 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+    run_reduce/2,
+    update_reduce_idx/6,
+    read_reduce/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+-define(LEVEL_FAN_POW, 4).
+-define(MAX_SKIP_LIST_LEVELS, 6).
+
+
+log_levels(Db, Sig, ViewId) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Opts = [{streaming_mode, want_all}],
+
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        lists:foreach(fun (Level) ->
+            {StartKey, EndKey} = erlfdb_tuple:range({Level},
+                ReduceIdxPrefix),
+
+            Acc0 = #{
+                sig => Sig,
+                view_id => ViewId,
+                reduce_idx_prefix => ReduceIdxPrefix,
+                next => key,
+                key => undefined,
+                rows => []
+            },
+
+            Fun = fun fold_fwd_cb/2,
+            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            #{
+                rows := Rows
+            } = Acc,
+            io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+            {ok, Rows}
+        end, Levels),
+        {ok, []}
+    end).
+
+
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        log_levels(TxDb, Sig, ViewId),
+%%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+
+
+        Acc0 = #{
+            sig => Sig,
+            view_id => ViewId,
+            user_acc => UserAcc0,
+            args => Args,
+            callback => UserCallback,
+            reduce_idx_prefix => ReduceIdxPrefix,
+            next => key,
+            rows => []
+        },
+
+
+%%        Opts = [{limit, 2}, {streaming_mode, want_all}],
+%%        EK = couch_views_encoding:encode(0, key),
+%%        {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
+%%            ReduceIdxPrefix),
+%%
+%%        Fun = fun fold_fwd_cb/2,
+%%        Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+        #{
+            rows := Rows
+        } = Acc0,
+        {ok, Rows}
+    end).
+
+args_to_fdb_opts(#mrargs{} = Args) ->
+    #mrargs{
+        limit = Limit,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+    ok.
+
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix
+    } = Acc,
+
+    {Level, EK, ?VIEW_ROW_KEY}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%%    Key = couch_views_encoding:decode(EV),
+    Val = couch_views_encoding:decode(EV),
+    Acc#{next := value, key := Val};
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix,
+        rows := Rows,
+        key := Key
+    } = Acc,
+
+    {Level, EK, ?VIEW_ROW_VALUE}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%%    Key = couch_views_encoding:decode(EV),
+    Val = couch_views_encoding:decode(EV),
+    Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+    ReduceFuns = lists:map(fun(View) ->
+        #mrview{
+            id_num = Id,
+            reduce_funs = ViewReduceFuns
+        } = View,
+
+        [{_, Fun}] = ViewReduceFuns,
+        Fun
+    end, Views),
+
+    lists:map(fun (MappedResult) ->
+        #{
+            results := Results
+        } = MappedResult,
+
+        ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+            reduce(ReduceFun, Result)
+        end, lists:zip(ReduceFuns, Results)),
+
+        MappedResult#{
+            reduce_results => ReduceResults
+        }
+    end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Val} = Acc,
+                Acc#{Key := Val + 1};
+            false ->
+                Acc#{Key => 1}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        io:format("MAX ~p ~p ~n", [Key, Val]),
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Max} = Acc,
+                case Max >= Val of
+                    true ->
+                        Acc;
+                    false ->
+                        Acc#{Key := Val}
+                end;
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+    true;
+
+is_builtin(_) ->
+    false.
+
+
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ViewOpts = #{
+        db_prefix => DbPrefix,
+        sig => Sig,
+        view_id => ViewId
+    },
+    create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
+
+    lists:foreach(fun ({Key, Val}) ->
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+    end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+        lists:foreach(fun(Level) ->
+            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+        end, Levels)
+    end).
+
+%% This sucks but its simple for now
+should_add_key_to_level(0, _, _) ->
+    true;
+
+should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
+    false;
+
+should_add_key_to_level(_, _, false) ->
+    false;
+
+should_add_key_to_level(_, _Key, _Prev) ->
+    crypto:rand_uniform(0, 2) == 0.
+
+%%should_add_key_to_level(Level, Key) ->
+%%    erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foldl(fun(Level, PrevCoinFlip) ->
+            io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
+            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+            io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
+            case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+                true ->
+                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
+                    true;
+                false ->
+                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
+                    false
+            end
+        end, true, Levels)
+    end).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+    case PrevVal >= Val of
+        true -> {PrevKey, PrevVal};
+        false -> {PrevKey, Val}
+    end.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
+    Key = {SkipLevel, ReduceKey, RowType},
+    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    EK = couch_views_encoding:encode(Key, key),
+    EVK = couch_views_encoding:encode(Key),
+    EV = couch_views_encoding:encode(Val),
+
+    KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
+    VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
+    ok = erlfdb:set(Tx, KK, EVK),
+    ok = erlfdb:set(Tx, VK, EV).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    % TODO: see if we need to add in conflict ranges for this for level=0
+    Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
+%%    LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
+
+    EK = couch_views_encoding:encode(Key, key),
+    EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+
+    {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
+%%    EndKey1 = erlfdb_key:first_greater_than(EndKey0),
+
+    Callback = fun row_cb/2,
+    Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
+    io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
+    Out.
+
+
+row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
+    io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+    {_Level, EK, _VIEW_ROW_VALUE}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    Val = couch_views_encoding:decode(EV),
+%%    io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
+
+    {key, {EK, ReduceIdxPrefix, Val}};
+
+row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
+    io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+    {_Level, EK, ?VIEW_ROW_KEY}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    Key = couch_views_encoding:decode(EVK),
+
+    {Key, Val}.
+
+
+
+
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_reduce_fdb.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_reduce_fdb.erl
index 2e443eb..bcaaa30 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -10,17 +10,24 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
 
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_reduce_fdb).
+
+
+-export([
+%%    write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%%    #{
+%%        id := DocId,
+%%        reduce_results := ReduceResults
+%%    } = Doc,
+%%    lists:foreach(fun({ViewId, NewRows}) ->
+%%        % update reduce index
+%%        ok
+%%    end, lists:zip(ViewIds, ReduceResults)).
 
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
 
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..3f7a173
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,286 @@
+defmodule CouchViewsReduceTest do
+  use Couch.Test.ExUnit.Case
+
+  alias Couch.Test.Utils
+
+  alias Couch.Test.Setup
+
+  alias Couch.Test.Setup.Step
+
+  setup_all do
+    test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+    on_exit(fn ->
+      :test_util.stop_couch(test_ctx)
+    end)
+  end
+
+  setup do
+    db_name = Utils.random_name("db")
+
+    admin_ctx =
+      {:user_ctx,
+       Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+    {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+    docs = create_docs()
+    ddoc = create_ddoc()
+
+    {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+    on_exit(fn ->
+      :fabric2_db.delete(db_name, [admin_ctx])
+    end)
+
+    %{
+      :db_name => db_name,
+      :db => db,
+      :ddoc => ddoc
+    }
+  end
+
+  #  test "group=true count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group => true
+  #      #            :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 1]},
+  #             {:row, [key: [1, 1, 5], value: 1]},
+  #             {:row, [key: [1, 2, 6], value: 1]},
+  #             {:row, [key: [2, 1], value: 1]},
+  #             {:row, [key: [2, 3, 6], value: 1]},
+  #             {:row, [key: [3, 1], value: 1]},
+  #             {:row, [key: [3, 1, 5], value: 1]},
+  #             {:row, [key: [3, 4, 5], value: 1]}
+  #           ]
+  #  end
+
+  #  test "group=1 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 1
+  #      #          :limit => 6
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1], value: 2]},
+  #             {:row, [key: [2], value: 2]},
+  #             {:row, [key: [3], value: 2]}
+  #           ]
+  #  end
+  #
+  #  test "group=2 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 2,
+  #      :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 2]},
+  #             {:row, [key: [1, 2], value: 1]},
+  #             {:row, [key: [2, 1], value: 1]},
+  #             {:row, [key: [2, 3], value: 1]},
+  #             {:row, [key: [3, 1], value: 2]},
+  #             {:row, [key: [3, 4], value: 1]}
+  #           ]
+  #  end
+  #
+  #  test "group=2 count reduce with limit = 3", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 2,
+  #      :limit => 4
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 1]}
+  #           ]
+  #  end
+  #
+  #  # [
+  #  #  row: [key: [2019, 1, 2], value: 1],
+  #  #  row: [key: [2019, 1, 4], value: 1],
+  #  #  row: [key: [2019, 2, 1], value: 1],
+  #  #  row: [key: [2019, 2, 3], value: 1]
+  #  # ]
+  #
+  #  test "group=2 count reduce with startkey", context do
+  #    args = %{
+  #      #          :reduce => true,
+  #      #          :group_level => 2,
+  #      :start_key => [2019, 1, 4]
+  #      #          :limit => 4
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "boom")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2019, 1], value: 1]},
+  #             {:row, [key: [2019, 2], value: 2]}
+  #           ]
+  #  end
+
+  test "group_level=0 _max reduce", context do
+    args = %{
+      :reduce => true,
+      :group_level => 0
+      #            :limit => 9
+    }
+
+    {:ok, res} = run_query(context, args, "max")
+    IO.inspect(res, label: "OUT")
+
+    assert res == [
+             {:row, [key: :null, value: 3]}
+           ]
+  end
+
+  defp run_query(context, args, view) do
+    db = context[:db]
+    ddoc = context[:ddoc]
+
+    :couch_views.query(db, ddoc, view, &__MODULE__.default_cb/2, [], args)
+  end
+
+  def default_cb(:complete, acc) do
+    {:ok, Enum.reverse(acc)}
+  end
+
+  def default_cb({:final, info}, []) do
+    {:ok, [info]}
+  end
+
+  def default_cb({:final, _}, acc) do
+    {:ok, acc}
+  end
+
+  def default_cb({:meta, _}, acc) do
+    {:ok, acc}
+  end
+
+  def default_cb(:ok, :ddoc_updated) do
+    {:ok, :ddoc_updated}
+  end
+
+  def default_cb(row, acc) do
+    {:ok, [row | acc]}
+  end
+
+  defp create_docs() do
+    for i <- 1..1 do
+      group =
+        if rem(i, 3) == 0 do
+          "first"
+        else
+          "second"
+        end
+
+      :couch_doc.from_json_obj(
+        {[
+           {"_id", "doc-id-#{i}"},
+           {"value", i},
+           {"some", "field"},
+           {"group", group}
+         ]}
+      )
+    end
+  end
+
+  defp create_ddoc() do
+    :couch_doc.from_json_obj(
+      {[
+         {"_id", "_design/bar"},
+         {"views",
+          {[
+#             {"baz",
+#              {[
+#                 {"map",
+#                  """
+#                  function(doc) {
+#                    emit(doc.value, doc.value);
+#                    emit(doc.value, doc.value);
+#                    emit([doc.value, 1], doc.value);
+#                    emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+#                    if (doc.value === 3) {
+#                      emit([1, 1, 5], 1);
+#                      emit([doc.value, 1, 5], 1);
+#                    }
+#                   }
+#                  """},
+#                 {"reduce", "_count"}
+#               ]}},
+#             {"boom",
+#              {[
+#                 {"map",
+#                  """
+#                  function(doc) {
+#                      var month = 1;
+#                      if (doc.value % 2) {
+#                          month = 2;
+#                      }
+#                      emit([2019, month, doc.value], doc.value);
+#                  }
+#                  """},
+#                 {"reduce", "_count"}
+#               ]}},
+             {"max",
+              {[
+                 {"map",
+                  """
+                  function(doc) {
+                      //emit(doc.value, doc.value);
+                      //emit([doc.value, 1], doc.value);
+                      //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+                        emit(1, 1);
+                        emit(2, 2);
+                        emit(3, 3);
+                        emit(4, 4);
+
+                       emit([2019, 2, 2], 1);
+                       emit([2019, 3, 3], 2);
+                       emit([2019, 3, 3], 3);
+                       emit([2019, 4, 3], 4);
+                       emit([2019, 5, 3], 6);
+                      if (doc.value === 3) {
+                       //emit([doc.value, 1, 5], 1);
+                      }
+                  }
+                  """},
+                 {"reduce", "_stats"}
+               ]}}
+           ]}}
+       ]}
+    )
+  end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()