You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/12/05 17:03:41 UTC

[couchdb] 03/08: progress with reading level 0

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0076f15d191bd2fb71a0837e474ce12019be0c8f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Oct 30 11:37:36 2019 +0200

    progress with reading level 0
---
 src/couch_views/src/couch_views_fdb.erl            |   4 +-
 src/couch_views/src/couch_views_indexer.erl        |   8 +-
 src/couch_views/src/couch_views_reader.erl         |  30 --
 src/couch_views/src/couch_views_reduce.erl         | 306 +++++++--------------
 src/couch_views/src/couch_views_reduce_fdb.erl     | 274 +++++++++++++++++-
 .../test/exunit/couch_views_reduce_test.exs        |  94 +++++--
 6 files changed, 428 insertions(+), 288 deletions(-)

diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 479a707..8999d76 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -133,8 +133,6 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     Acc1.
 
 
-
-
 write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     #{
         id := DocId
@@ -181,7 +179,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 []
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
-        couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
             ExistingKeys, ReduceResult)
     end, lists:zip3(ViewIds, Results, ReduceResults)).
 
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4c430c1..cff15b0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -298,11 +298,17 @@ write_docs(TxDb, Mrst, Docs, State) ->
     } = Mrst,
 
     #{
-        last_seq := LastSeq
+        last_seq := LastSeq,
+        view_seq := ViewSeq
     } = State,
 
     ViewIds = [View#mrview.id_num || View <- Views],
 
+    %%  First build of the view
+    if ViewSeq /= <<>> -> ok; true ->
+        couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+    end,
+
     lists:foreach(fun(Doc) ->
         couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
     end, Docs),
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index d08515c..394b3cf 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -236,36 +236,6 @@ handle_row(DocId, Key, Value, Acc) ->
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
     Acc#{acc := UserAcc1}.
 
-handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
-    Acc#{skip := Skip - 1};
-
-handle_reduce_row(Key, Value, Acc) ->
-    io:format("ACC ~p ~n", [Acc]),
-    #{
-        callback := UserCallback,
-        acc := UserAcc0,
-        row_count := RowCount,
-        limit := Limit
-    } = Acc,
-
-    Row = [
-        {key, Key},
-        {value, Value}
-    ],
-
-    RowCountNext = RowCount + 1,
-
-    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
-    Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
-
-    case RowCountNext == Limit of
-        true ->
-            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
-            maybe_stop({stop, UserAcc2});
-        false ->
-            Acc1
-    end.
-
 
 get_view_id(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 4cb7416..ebd2f47 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,8 @@
 
 -export([
     run_reduce/2,
-    update_reduce_idx/6,
-    read_reduce/6
+    read_reduce/6,
+    setup_reduce_indexes/3
 ]).
 
 
@@ -30,99 +30,120 @@
 -define(MAX_SKIP_LIST_LEVELS, 6).
 
 
-log_levels(Db, Sig, ViewId) ->
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
     #{
         db_prefix := DbPrefix
     } = Db,
 
-    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-    Opts = [{streaming_mode, want_all}],
+    #mrargs{
+        limit = Limit
+    } = Args,
 
-    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
-        lists:foreach(fun (Level) ->
-            {StartKey, EndKey} = erlfdb_tuple:range({Level},
-                ReduceIdxPrefix),
+    Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+
+    try
+        fabric2_fdb:transactional(Db, fun(TxDb) ->
+    %%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
 
             Acc0 = #{
                 sig => Sig,
                 view_id => ViewId,
+                user_acc => UserAcc0,
+                args => Args,
+                callback => UserCallback,
                 reduce_idx_prefix => ReduceIdxPrefix,
-                next => key,
-                key => undefined,
-                rows => []
+                limit => Limit,
+                row_count => 0
             },
 
-            Fun = fun fold_fwd_cb/2,
-            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            Fun = fun handle_row/3,
+            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
             #{
-                rows := Rows
-            } = Acc,
-            io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
-            {ok, Rows}
-        end, Levels),
-        {ok, []}
-    end).
-
+                user_acc := UserAcc1
+            } = Acc1,
+            {ok, maybe_stop(UserCallback(complete, UserAcc1))}
+        end)
+    catch throw:{done, Out} ->
+        {ok, Out}
+    end.
 
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
-    #{
-        db_prefix := DbPrefix
-    } = Db,
 
-    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
-        log_levels(TxDb, Sig, ViewId),
-%%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-
-
-        Acc0 = #{
-            sig => Sig,
-            view_id => ViewId,
-            user_acc => UserAcc0,
-            args => Args,
-            callback => UserCallback,
-            reduce_idx_prefix => ReduceIdxPrefix,
-            rows => []
-        },
-
-
-%%        Opts = [{limit, 2}, {streaming_mode, want_all}],
-%%        EK = couch_views_encoding:encode(0, key),
-%%        {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
-%%            ReduceIdxPrefix),
-%%
-%%        Fun = fun fold_fwd_cb/2,
-%%        Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
-        #{
-            rows := Rows
-        } = Acc0,
-        {ok, Rows}
-    end).
-
-args_to_fdb_opts(#mrargs{} = Args) ->
+args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
     #mrargs{
-        limit = Limit,
-        start_key = StartKey,
-        end_key = EndKey
+%%        limit = Limit,
+%%        start_key = StartKey,
+%%        end_key = EndKey,
+        group = Group,
+        group_level = GroupLevel
     } = Args,
-    ok.
 
+    {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
+        ReduceIdxPrefix),
+
+    StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
+
+%%    StartKey1 = case StartKey of
+%%        undefined -> erlfdb_key:first_greater_than(StartKey0);
+%%        StartKey -> create_key(StartKey, 0, Red)
+%%    end,
+
+    StartKey1 = erlfdb_key:first_greater_than(StartKey0),
 
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+    [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+
+
+encode_key(Key, Level) ->
+    {Level, couch_views_encoding:encode(Key, key)}.
+
+
+handle_row(Key, Value, Acc) ->
     #{
-        reduce_idx_prefix := ReduceIdxPrefix,
-        rows := Rows
+        callback := UserCallback,
+        user_acc := UserAcc0,
+        row_count := RowCount,
+        limit := Limit
     } = Acc,
 
-    {_Level, _EK}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    {EK, EV1} = erlfdb_tuple:unpack(EV),
-    Key = couch_views_encoding:decode(EK),
-    Val = couch_views_encoding:decode(EV1),
+    Row = [
+        {key, Key},
+        {value, Value}
+    ],
 
-    Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
+    RowCountNext = RowCount + 1,
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc1 = Acc#{user_acc := UserAcc1, row_count := RowCountNext},
+
+    case RowCountNext == Limit of
+        true ->
+            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+            maybe_stop({stop, UserAcc2});
+        false ->
+            Acc1
+    end.
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+setup_reduce_indexes(Db, Sig, ViewIds) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foreach(fun (ViewId) ->
+            ViewOpts = #{
+                db_prefix => DbPrefix,
+                sig => Sig,
+                view_id => ViewId
+            },
+            couch_views_reduce_fdb:create_skip_list(TxDb,
+                ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+        end, ViewIds)
+    end).
 
 
 run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -192,149 +213,6 @@ is_builtin(<<"_", _/binary>>) ->
 is_builtin(_) ->
     false.
 
-
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
-    #{
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    ViewOpts = #{
-        db_prefix => DbPrefix,
-        sig => Sig,
-        view_id => ViewId
-    },
-    create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
-
-    lists:foreach(fun ({Key, Val}) ->
-        io:format("RESULTS KV ~p ~p ~n", [Key, Val])
-%%        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
-    end, ReduceResult).
-
-
-create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
-    #{
-        db_prefix := DbPrefix,
-        sig := Sig,
-        view_id := ViewId
-    } = ViewOpts,
-
-    Levels = lists:seq(0, MaxLevel),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-
-        lists:foreach(fun(Level) ->
-            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
-        end, Levels)
-    end).
-
-
-should_add_key_to_level(Level, Key) ->
-    (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
-
-
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
-    #{
-        db_prefix := DbPrefix,
-        sig := Sig,
-        view_id := ViewId
-    } = ViewOpts,
-
-    Levels = lists:seq(0, MaxLevel),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        lists:foldl(fun(Level) ->
-            io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
-            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
-            io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
-            case should_add_key_to_level(Level, Key) of
-                true ->
-                    io:format("Adding ~p ~p ~n", [Level, Key]),
-                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
-                false ->
-                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
-            end
-        end, true, Levels)
-    end).
-
-
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
-    case PrevVal >= Val of
-        true -> {PrevKey, PrevVal};
-        false -> {PrevKey, Val}
-    end.
-
-
 reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
-
-
-create_key(ReduceIdxPrefix, SkipLevel, Key) ->
-    EK = couch_views_encoding:encode(Key, key),
-    LevelKey = {SkipLevel, EK},
-    erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
-
-
-create_value(Key, Val) ->
-    EK = couch_views_encoding:encode(Key),
-    EV = couch_views_encoding:encode(Val),
-    erlfdb_tuple:pack({EK, EV}).
-
-
-add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
-    #{
-        tx := Tx
-    } = TxDb,
-
-    LevelKey = create_key(ReduceIdxPrefix, Level, Key),
-    EV = create_value(Key, Val),
-
-    ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
-    #{
-        tx := Tx
-    } = TxDb,
-
-    % TODO: see if we need to add in conflict ranges for this for level=0
-    Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
-%%    LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
-
-    EK = couch_views_encoding:encode(Key, key),
-    EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-
-    {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
-%%    EndKey1 = erlfdb_key:first_greater_than(EndKey0),
-
-    Callback = fun row_cb/2,
-    Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
-    io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
-    Out.
-
-
-row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
-    io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
-    {_Level, EK, _VIEW_ROW_VALUE}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    Val = couch_views_encoding:decode(EV),
-%%    io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
-
-    {key, {EK, ReduceIdxPrefix, Val}};
-
-row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
-    io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
-    {_Level, EK, ?VIEW_ROW_KEY}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    Key = couch_views_encoding:decode(EVK),
-
-    {Key, Val}.
-
-
-
-
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index bcaaa30..9683265 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,19 +15,273 @@
 
 
 -export([
-%%    write_doc/4
+    fold_level0/6,
+    create_skip_list/3,
+    update_reduce_idx/6
 ]).
 
-% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
 
-%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
+-define(LEVEL_FAN_POW, 1).
+
+log_levels(Db, Sig, ViewId) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, 6),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Opts = [{streaming_mode, want_all}],
+
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        lists:foreach(fun (Level) ->
+            {StartKey, EndKey} = erlfdb_tuple:range({Level},
+                ReduceIdxPrefix),
+
+            Acc0 = #{
+                sig => Sig,
+                view_id => ViewId,
+                reduce_idx_prefix => ReduceIdxPrefix,
+                user_acc => [],
+                callback => fun handle_log_levels/3
+            },
+
+            Fun = fun fold_fwd_cb/2,
+            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            #{
+                user_acc := Rows
+            } = Acc,
+            io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
+        end, Levels)
+    end).
+
+handle_log_levels(Key, Value, Acc) ->
+    Acc ++ [{Key, Value}].
+
+%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
 %%    #{
-%%        id := DocId,
-%%        reduce_results := ReduceResults
-%%    } = Doc,
-%%    lists:foreach(fun({ViewId, NewRows}) ->
-%%        % update reduce index
-%%        ok
-%%    end, lists:zip(ViewIds, ReduceResults)).
+%%        db_prefix := DbPrefix
+%%    } = Db,
+%%
+%%%%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%%    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+%%    #mrargs{
+%%        limit = Limit
+%%    } = Args,
+%%
+%%    fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%
+%%        Acc0 = #{
+%%            sig => Sig,
+%%            view_id => ViewId,
+%%            user_acc => UserAcc0,
+%%            args => Args,
+%%            callback => UserCallback,
+%%            reduce_idx_prefix => ReduceIdxPrefix,
+%%            limit => Limit,
+%%            row_count => 0
+%%
+%%        },
+%%
+%%        Acc1 = read_level0_only(TxDb, Acc0, Callback),
+%%        #{
+%%            user_acc := UserAcc1
+%%        } = Acc1,
+%%        {ok, UserAcc1}
+%%    end).
+
+fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Level = 0,
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Acc = #{
+        sig => Sig,
+        view_id => ViewId,
+        user_acc => UserAcc0,
+        %%            args := Args,
+        callback => UserCallback,
+        reduce_idx_prefix => ReduceIdxPrefix
+    },
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        log_levels(TxDb, Sig, ViewId),
+        #{
+            tx := Tx
+        } = TxDb,
+
+
+%%        {StartKey, EndKey} = erlfdb_tuple:range({0},
+%%            ReduceIdxPrefix),
+        {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+        {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
+%%        ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+        Fun = fun fold_fwd_cb/2,
+        Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
+        #{
+            user_acc := UserAcc1
+        } = Acc1,
+        UserAcc1
+    end).
+
+
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix,
+        callback := Callback,
+        user_acc := UserAcc
+    } = Acc,
+
+    {_Level, _EK}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    {EK, EV1} = erlfdb_tuple:unpack(EV),
+    Key = couch_views_encoding:decode(EK),
+    Val = couch_views_encoding:decode(EV1),
+
+    UserAcc1 = Callback(Key, Val, UserAcc),
+    Acc#{user_acc := UserAcc1}.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ViewOpts = #{
+        db_prefix => DbPrefix,
+        sig => Sig,
+        view_id => ViewId
+    },
+
+    lists:foreach(fun ({Key, Val}) ->
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+    end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+        lists:foreach(fun(Level) ->
+            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+        end, Levels)
+    end).
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    KeyHash = hash_key(Key),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foreach(fun(Level) ->
+            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+            io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+            case should_add_key_to_level(Level, KeyHash) of
+                true ->
+                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+                false ->
+                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+            end
+        end, Levels)
+    end).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    % TODO: see if we need to add in conflict ranges for this for level=0
+    Opts = [{limit, 1}, {streaming_mode, want_all}],
+
+    EK = couch_views_encoding:encode(Key, key),
+    StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+    StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+    EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+
+    Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
+    [{_FullEncodedKey, PackedValue}] = erlfdb:wait(Future),
+    get_key_value(PackedValue).
+
+
+hash_key(Key) ->
+    erlang:phash2(Key).
+
+
+should_add_key_to_level(Level, KeyHash) ->
+    (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
+%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+    EK = couch_views_encoding:encode(Key, key),
+    LevelKey = {SkipLevel, EK},
+    erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+    EK = couch_views_encoding:encode(Key),
+    EV = couch_views_encoding:encode(Val),
+    erlfdb_tuple:pack({EK, EV}).
+
+
+get_key_value(PackedValue) ->
+    {EncodedKey, EncodedValue}
+        = erlfdb_tuple:unpack(PackedValue),
+    Key = couch_views_encoding:decode(EncodedKey),
+    Value = couch_views_encoding:decode(EncodedValue),
+    {Key, Value}.
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+    EV = create_value(Key, Val),
+
+    ok = erlfdb:set(Tx, LevelKey, EV).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+    case PrevVal >= Val of
+        true -> {PrevKey, PrevVal};
+        false -> {PrevKey, Val}
+    end.
+
 
 
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index a526658..c1b35e2 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,29 +40,37 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-  test "group=true count reduce", context do
-    args = %{
-      :reduce => true,
-      :group => true
-      #            :limit => 9
-    }
+#  test "group=true count reduce with limit", context do
+#    args = %{
+#      :reduce => true,
+#      :group => true,
+#      :limit => 3
+#    }
+#
+#    {:ok, res} = run_query(context, args, "dates")
+#    IO.inspect(res, label: "OUT")
+#
+#    assert res == [
+#             {:row, [key: [2017, 3, 1], value: 1]},
+#             {:row, [key: [2017, 4, 1], value: 1]},
+#             {:row, [key: [2017, 4, 15], value: 1]}
+#           ]
+#  end
+
+  test "group_level=1 count reduce", context do
+      args = %{
+          :reduce => true,
+          :group => true,
+      }
 
-    {:ok, res} = run_query(context, args, "baz")
-    IO.inspect(res, label: "OUT")
+      {:ok, res} = run_query(context, args, "dates")
+      IO.inspect(res, label: "OUT")
 
-    assert res == [
-             {:row, [key: 1, value: 2]},
-             {:row, [key: 2, value: 2]},
-             {:row, [key: 3, value: 2]},
-             {:row, [key: [1, 1], value: 1]},
-             {:row, [key: [1, 1, 5], value: 1]},
-             {:row, [key: [1, 2, 6], value: 1]},
-             {:row, [key: [2, 1], value: 1]},
-             {:row, [key: [2, 3, 6], value: 1]},
-             {:row, [key: [3, 1], value: 1]},
-             {:row, [key: [3, 1, 5], value: 1]},
-             {:row, [key: [3, 4, 5], value: 1]}
-           ]
+      assert res == [
+                 {:row, [key: [2017], value: 1]},
+                 {:row, [key: [2018], value: 1]},
+                 {:row, [key: [2019], value: 1]}
+             ]
   end
 
   #  test "group=1 count reduce", context do
@@ -173,6 +181,7 @@ defmodule CouchViewsReduceTest do
   end
 
   def default_cb(:complete, acc) do
+    IO.inspect(acc, label: "complete")
     {:ok, Enum.reverse(acc)}
   end
 
@@ -197,7 +206,22 @@ defmodule CouchViewsReduceTest do
   end
 
   defp create_docs() do
-    for i <- 1..1 do
+    dates = [
+      [2017, 3, 1],
+      [2017, 4, 1],
+      # out of order check
+      [2019, 3, 1],
+      [2017, 4, 15],
+      [2018, 4, 1],
+      [2017, 5, 1],
+      [2018, 3, 1],
+      # duplicate check
+      [2018, 4, 1],
+      [2018, 5, 1],
+      [2019, 4, 1]
+    ]
+
+    for i <- 1..4 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -205,14 +229,14 @@ defmodule CouchViewsReduceTest do
           "second"
         end
 
-      :couch_doc.from_json_obj(
-        {[
-           {"_id", "doc-id-#{i}"},
-           {"value", i},
-           {"some", "field"},
-           {"group", group}
-         ]}
-      )
+      :couch_doc.from_json_obj({[
+         {"_id", "doc-id-#{i}"},
+         {"value", i},
+         {"some", "field"},
+         {"group", group},
+         {"date", Enum.at(dates, i - 1)}
+         #           {"timestamp", Enum.at(timestamps, i - 1)}
+       ]})
     end
   end
 
@@ -221,6 +245,16 @@ defmodule CouchViewsReduceTest do
        {"_id", "_design/bar"},
        {"views",
         {[
+           {"dates",
+            {[
+               {"map",
+                """
+                function(doc) {
+                  emit(doc.date, doc.value);
+                 }
+                """},
+               {"reduce", "_count"}
+             ]}},
            {"baz",
             {[
                {"map",