You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/19 14:57:38 UTC

[couchdb] branch prototype/builtin-reduce updated: basic skip list setup

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/builtin-reduce by this push:
     new c094548  basic skip list setup
c094548 is described below

commit c09454804fa90813e4d47450b582d7c267c89da8
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Sep 19 16:57:25 2019 +0200

    basic skip list setup
---
 src/couch_views/include/couch_views.hrl            |   4 +
 src/couch_views/src/couch_views_fdb.erl            |   7 +-
 src/couch_views/src/couch_views_reader.erl         |  90 +++----
 src/couch_views/src/couch_views_reduce.erl         | 293 ++++++++++++++++++++-
 .../test/exunit/couch_views_reduce_test.exs        | 293 +++++++++++----------
 5 files changed, 500 insertions(+), 187 deletions(-)

diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index aa272ce..9a6c2fc 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -16,6 +16,7 @@
 -define(VIEW_ID_RANGE, 2).
 -define(VIEW_MAP_RANGE, 3).
 -define(VIEW_REDUCE_RANGE, 4).
+-define(VIEW_REDUCE_SK_RANGE, 5).
 
 -define(VIEW_ROW_COUNT, 0).
 -define(VIEW_KV_SIZE, 1).
@@ -28,3 +29,6 @@
 
 % jobs api
 -define(INDEX_JOB_TYPE, <<"views">>).
+
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 2eff4bd..7eef208 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -220,8 +220,6 @@ reduce_fold_fwd({RowKey, EncodedValue}, #{next := value} = Acc) ->
 
     % TODO: Handle more than uint
     Value = ?bin2uint(EncodedValue),
-    io:format("FWD VAL ~p ~p ~p ~p ~n", [Key, RowGroupLevel, Value, ReduceType]),
-    io:format("GROUP SETTINGS ~p ~p ~n", [Group, GroupLevel]),
     UserAcc1 = case should_return_row(PrevGroupKey, Key, Group, GroupLevel, RowGroupLevel, ReduceType) of
         true ->
             UserCallback(Key, Value, UserAcc0);
@@ -299,6 +297,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
     clear_id_idx(TxDb, Sig, DocId),
 
     %% TODO: handle when there is no reduce
+    io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
     lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
@@ -317,6 +316,8 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 []
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+        couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+            ExistingKeys, ReduceResult),
         update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
     end, lists:zip3(ViewIds, Results, ReduceResults)).
 
@@ -634,10 +635,8 @@ process_reduce_rows(Rows) ->
     ReduceExact = encode_reduce_rows(Rows),
     ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
         Out = create_grouping(Key, Val, [], Groupings),
-        io:format("ROW G ~p ~p ~p ~n", [Key, Val, Out]),
         Out
     end, #{}, Rows),
-    io:format("INPUT ROWS ~n Groups ~p ~n Exact ~p ~n", [maps:to_list(ReduceGroups), Rows]),
     ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
     {ReduceExact, ReduceGroups1}.
 
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 359193a..d08515c 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -32,50 +32,52 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     } = Mrst,
 
     ViewId = get_view_id(Lang, Args, ViewName, Views),
-    Fun = fun handle_reduce_row/3,
-
-    try
-        fabric2_fdb:transactional(Db, fun(TxDb) ->
-            Meta = get_meta(TxDb, Mrst, ViewId, Args),
-            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
-
-            #mrargs{
-                limit = Limit
-            } = Args,
-
-            Acc0 = #{
-                db => TxDb,
-                skip => Args#mrargs.skip,
-                mrargs => undefined,
-                callback => UserCallback,
-                acc => UserAcc1,
-                row_count => 0,
-                limit => Limit
-            },
-
-            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
-                Opts = reduce_mrargs_to_fdb_options(KeyArgs),
-                KeyAcc1 = KeyAcc0#{
-                    mrargs := KeyArgs
-                },
-                couch_views_fdb:fold_reduce_idx(
-                    TxDb,
-                    Sig,
-                    ViewId,
-                    Opts,
-                    Fun,
-                    KeyAcc1
-                )
-            end, Acc0, expand_keys_args(Args)),
-
-            #{
-                acc := UserAcc2
-            } = Acc1,
-            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
-        end)
-    catch throw:{done, Out} ->
-        {ok, Out}
-    end.
+    couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+        UserAcc0, Args).
+%%    Fun = fun handle_reduce_row/3,
+%%
+%%    try
+%%        fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%            Meta = get_meta(TxDb, Mrst, ViewId, Args),
+%%            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+%%
+%%            #mrargs{
+%%                limit = Limit
+%%            } = Args,
+%%
+%%            Acc0 = #{
+%%                db => TxDb,
+%%                skip => Args#mrargs.skip,
+%%                mrargs => undefined,
+%%                callback => UserCallback,
+%%                acc => UserAcc1,
+%%                row_count => 0,
+%%                limit => Limit
+%%            },
+%%
+%%            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+%%                Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+%%                KeyAcc1 = KeyAcc0#{
+%%                    mrargs := KeyArgs
+%%                },
+%%                couch_views_fdb:fold_reduce_idx(
+%%                    TxDb,
+%%                    Sig,
+%%                    ViewId,
+%%                    Opts,
+%%                    Fun,
+%%                    KeyAcc1
+%%                )
+%%            end, Acc0, expand_keys_args(Args)),
+%%
+%%            #{
+%%                acc := UserAcc2
+%%            } = Acc1,
+%%            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+%%        end)
+%%    catch throw:{done, Out} ->
+%%        {ok, Out}
+%%    end.
 
 
 reduce_mrargs_to_fdb_options(Args) ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 9101509..759d956 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -14,13 +14,125 @@
 
 
 -export([
-    run_reduce/2
+    run_reduce/2,
+    update_reduce_idx/6,
+    read_reduce/6
 ]).
 
 
 -include("couch_views.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(LEVEL_FAN_POW, 4).
+
+log_levels(Db, Sig, ViewId) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Opts = [{streaming_mode, want_all}],
+
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        lists:foreach(fun (Level) ->
+            {StartKey, EndKey} = erlfdb_tuple:range({Level},
+                ReduceIdxPrefix),
+
+            Acc0 = #{
+                sig => Sig,
+                view_id => ViewId,
+                reduce_idx_prefix => ReduceIdxPrefix,
+                next => key,
+                key => undefined,
+                rows => []
+            },
+
+            Fun = fun fold_fwd_cb/2,
+            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            #{
+                rows := Rows
+            } = Acc,
+            io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+            {ok, Rows}
+        end, Levels),
+        {ok, []}
+    end).
+
+
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        log_levels(TxDb, Sig, ViewId),
+%%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+
+
+        Acc0 = #{
+            sig => Sig,
+            view_id => ViewId,
+            user_acc => UserAcc0,
+            args => Args,
+            callback => UserCallback,
+            reduce_idx_prefix => ReduceIdxPrefix,
+            next => key,
+            rows => []
+        },
+
+
+%%        Opts = [{limit, 2}, {streaming_mode, want_all}],
+%%        EK = couch_views_encoding:encode(0, key),
+%%        {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
+%%            ReduceIdxPrefix),
+%%
+%%        Fun = fun fold_fwd_cb/2,
+%%        Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+        #{
+            rows := Rows
+        } = Acc0,
+        {ok, Rows}
+    end).
+
+args_to_fdb_opts(#mrargs{} = Args) ->
+    #mrargs{
+        limit = Limit,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+    ok.
+
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix
+    } = Acc,
+
+    {Level, EK, ?VIEW_ROW_KEY}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%%    Key = couch_views_encoding:decode(EV),
+    Val = couch_views_encoding:decode(EV),
+    Acc#{next := value, key := Val};
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix,
+        rows := Rows,
+        key := Key
+    } = Acc,
+
+    {Level, EK, ?VIEW_ROW_VALUE}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%%    Key = couch_views_encoding:decode(EV),
+    Val = couch_views_encoding:decode(EV),
+    Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
 
 
 run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -59,6 +171,28 @@ reduce(<<"_count">>, Results) ->
                 Acc#{Key => 1}
         end
     end, #{}, Results),
+    maps:to_list(ReduceResults);
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        io:format("MAX ~p ~p ~n", [Key, Val]),
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Max} = Acc,
+                case Max >= Val of
+                    true ->
+                        Acc;
+                    false ->
+                        Acc#{Key := Val}
+                end;
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
     maps:to_list(ReduceResults).
 
 
@@ -68,3 +202,160 @@ is_builtin(<<"_", _/binary>>) ->
 is_builtin(_) ->
     false.
 
+
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ViewOpts = #{
+        db_prefix => DbPrefix,
+        sig => Sig,
+        view_id => ViewId
+    },
+    create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
+
+    lists:foreach(fun ({Key, Val}) ->
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+    end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+        lists:foreach(fun(Level) ->
+            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+        end, Levels)
+    end).
+
+%% This sucks but its simple for now
+should_add_key_to_level(0, _, _) ->
+    true;
+
+should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
+    false;
+
+should_add_key_to_level(_, _, false) ->
+    false;
+
+should_add_key_to_level(_, _Key, _Prev) ->
+    crypto:rand_uniform(0, 2) == 0.
+
+%%should_add_key_to_level(Level, Key) ->
+%%    erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foldl(fun(Level, PrevCoinFlip) ->
+            io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
+            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+            io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
+            case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+                true ->
+                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
+                    true;
+                false ->
+                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
+                    false
+            end
+        end, true, Levels)
+    end).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+    case PrevVal >= Val of
+        true -> {PrevKey, PrevVal};
+        false -> {PrevKey, Val}
+    end.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
+    Key = {SkipLevel, ReduceKey, RowType},
+    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    EK = couch_views_encoding:encode(Key, key),
+    EVK = couch_views_encoding:encode(Key),
+    EV = couch_views_encoding:encode(Val),
+
+    KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
+    VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
+    ok = erlfdb:set(Tx, KK, EVK),
+    ok = erlfdb:set(Tx, VK, EV).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    % TODO: see if we need to add in conflict ranges for this for level=0
+    Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
+%%    LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
+
+    EK = couch_views_encoding:encode(Key, key),
+    EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+
+    {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
+%%    EndKey1 = erlfdb_key:first_greater_than(EndKey0),
+
+    Callback = fun row_cb/2,
+    Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
+    io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
+    Out.
+
+
+row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
+    io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+    {_Level, EK, _VIEW_ROW_VALUE}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    Val = couch_views_encoding:decode(EV),
+%%    io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
+
+    {key, {EK, ReduceIdxPrefix, Val}};
+
+row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
+    io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+    {_Level, EK, ?VIEW_ROW_KEY}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    Key = couch_views_encoding:decode(EVK),
+
+    {Key, Val}.
+
+
+
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index f6efc0c..696eb5e 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,115 +40,130 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-  test "group=true count reduce", context do
-    args = %{
-      :reduce => true,
-      :group => true
-      #            :limit => 9
-    }
+  #  test "group=true count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group => true
+  #      #            :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 1]},
+  #             {:row, [key: [1, 1, 5], value: 1]},
+  #             {:row, [key: [1, 2, 6], value: 1]},
+  #             {:row, [key: [2, 1], value: 1]},
+  #             {:row, [key: [2, 3, 6], value: 1]},
+  #             {:row, [key: [3, 1], value: 1]},
+  #             {:row, [key: [3, 1, 5], value: 1]},
+  #             {:row, [key: [3, 4, 5], value: 1]}
+  #           ]
+  #  end
 
-    {:ok, res} = run_query(context, args, "baz")
-    IO.inspect(res, label: "OUT")
-
-    assert res == [
-             {:row, [key: 1, value: 2]},
-             {:row, [key: 2, value: 2]},
-             {:row, [key: 3, value: 2]},
-             {:row, [key: [1, 1], value: 1]},
-             {:row, [key: [1, 1, 5], value: 1]},
-             {:row, [key: [1, 2, 6], value: 1]},
-             {:row, [key: [2, 1], value: 1]},
-             {:row, [key: [2, 3, 6], value: 1]},
-             {:row, [key: [3, 1], value: 1]},
-             {:row, [key: [3, 1, 5], value: 1]},
-             {:row, [key: [3, 4, 5], value: 1]}
-           ]
-  end
+  #  test "group=1 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 1
+  #      #          :limit => 6
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1], value: 2]},
+  #             {:row, [key: [2], value: 2]},
+  #             {:row, [key: [3], value: 2]}
+  #           ]
+  #  end
+  #
+  #  test "group=2 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 2,
+  #      :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 2]},
+  #             {:row, [key: [1, 2], value: 1]},
+  #             {:row, [key: [2, 1], value: 1]},
+  #             {:row, [key: [2, 3], value: 1]},
+  #             {:row, [key: [3, 1], value: 2]},
+  #             {:row, [key: [3, 4], value: 1]}
+  #           ]
+  #  end
+  #
+  #  test "group=2 count reduce with limit = 3", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 2,
+  #      :limit => 4
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 1]}
+  #           ]
+  #  end
+  #
+  #  # [
+  #  #  row: [key: [2019, 1, 2], value: 1],
+  #  #  row: [key: [2019, 1, 4], value: 1],
+  #  #  row: [key: [2019, 2, 1], value: 1],
+  #  #  row: [key: [2019, 2, 3], value: 1]
+  #  # ]
+  #
+  #  test "group=2 count reduce with startkey", context do
+  #    args = %{
+  #      #          :reduce => true,
+  #      #          :group_level => 2,
+  #      :start_key => [2019, 1, 4]
+  #      #          :limit => 4
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "boom")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2019, 1], value: 1]},
+  #             {:row, [key: [2019, 2], value: 2]}
+  #           ]
+  #  end
 
-#  test "group=1 count reduce", context do
-#    args = %{
-#      :reduce => true,
-#      :group_level => 1
-#      #          :limit => 6
-#    }
-#
-#    {:ok, res} = run_query(context, args, "baz")
-#    IO.inspect(res, label: "OUT")
-#
-#    assert res == [
-#             {:row, [key: 1, value: 2]},
-#             {:row, [key: 2, value: 2]},
-#             {:row, [key: 3, value: 2]},
-#             {:row, [key: [1], value: 2]},
-#             {:row, [key: [2], value: 2]},
-#             {:row, [key: [3], value: 2]}
-#           ]
-#  end
-#
-  test "group=2 count reduce", context do
+  test "group_level=0 _max reduce", context do
     args = %{
       :reduce => true,
-      :group_level => 2,
-      :limit => 9
+      :group_level => 0
+      #            :limit => 9
     }
 
-    {:ok, res} = run_query(context, args, "baz")
+    {:ok, res} = run_query(context, args, "max")
     IO.inspect(res, label: "OUT")
 
     assert res == [
-             {:row, [key: 1, value: 2]},
-             {:row, [key: 2, value: 2]},
-             {:row, [key: 3, value: 2]},
-             {:row, [key: [1, 1], value: 2]},
-             {:row, [key: [1, 2], value: 1]},
-             {:row, [key: [2, 1], value: 1]},
-             {:row, [key: [2, 3], value: 1]},
-             {:row, [key: [3, 1], value: 2]},
-             {:row, [key: [3, 4], value: 1]}
+             {:row, [key: :null, value: 3]}
            ]
   end
-#
-#  test "group=2 count reduce with limit = 3", context do
-#    args = %{
-#      :reduce => true,
-#      :group_level => 2,
-#      :limit => 4
-#    }
-#
-#    {:ok, res} = run_query(context, args, "baz")
-#    IO.inspect(res, label: "OUT")
-#
-#    assert res == [
-#             {:row, [key: 1, value: 2]},
-#             {:row, [key: 2, value: 2]},
-#             {:row, [key: 3, value: 2]},
-#             {:row, [key: [1, 1], value: 1]}
-#           ]
-#  end
-#
-#  # [
-#  #  row: [key: [2019, 1, 2], value: 1],
-#  #  row: [key: [2019, 1, 4], value: 1],
-#  #  row: [key: [2019, 2, 1], value: 1],
-#  #  row: [key: [2019, 2, 3], value: 1]
-#  # ]
-#
-#  test "group=2 count reduce with startkey", context do
-#    args = %{
-#      #          :reduce => true,
-#      #          :group_level => 2,
-#      :start_key => [2019, 1, 4]
-#      #          :limit => 4
-#    }
-#
-#    {:ok, res} = run_query(context, args, "boom")
-#    IO.inspect(res, label: "OUT")
-#
-#    assert res == [
-#             {:row, [key: [2019, 1], value: 1]},
-#             {:row, [key: [2019, 2], value: 2]}
-#           ]
-#  end
 
   defp run_query(context, args, view) do
     db = context[:db]
@@ -182,7 +197,7 @@ defmodule CouchViewsReduceTest do
   end
 
   defp create_docs() do
-    for i <- 1..3 do
+    for i <- 1..1 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -207,54 +222,56 @@ defmodule CouchViewsReduceTest do
          {"_id", "_design/bar"},
          {"views",
           {[
-             {"baz",
-              {[
-                 {"map",
-                  """
-                  function(doc) {
-                    emit(doc.value, doc.value);
-                    emit(doc.value, doc.value);
-                    emit([doc.value, 1], doc.value);
-                    emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
-                    if (doc.value === 3) {
-                      emit([1, 1, 5], 1);
-                      emit([doc.value, 1, 5], 1);
-                    }
-                   }
-                  """},
-                 {"reduce", "_count"}
-               ]}},
-             {"boom",
-              {[
-                 {"map",
-                  """
-                  function(doc) {
-                      var month = 1;
-                      if (doc.value % 2) {
-                          month = 2;
-                      }
-                      emit([2019, month, doc.value], doc.value);
-                  }
-                  """},
-                 {"reduce", "_count"}
-               ]}},
+#             {"baz",
+#              {[
+#                 {"map",
+#                  """
+#                  function(doc) {
+#                    emit(doc.value, doc.value);
+#                    emit(doc.value, doc.value);
+#                    emit([doc.value, 1], doc.value);
+#                    emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+#                    if (doc.value === 3) {
+#                      emit([1, 1, 5], 1);
+#                      emit([doc.value, 1, 5], 1);
+#                    }
+#                   }
+#                  """},
+#                 {"reduce", "_count"}
+#               ]}},
+#             {"boom",
+#              {[
+#                 {"map",
+#                  """
+#                  function(doc) {
+#                      var month = 1;
+#                      if (doc.value % 2) {
+#                          month = 2;
+#                      }
+#                      emit([2019, month, doc.value], doc.value);
+#                  }
+#                  """},
+#                 {"reduce", "_count"}
+#               ]}},
              {"max",
               {[
                  {"map",
                   """
                   function(doc) {
-                      emit(doc.value, doc.value);
-                      emit(doc.value, doc.value);
-                      emit([doc.value, 1], doc.value);
-                      emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
+                      //emit(doc.value, doc.value);
+                      //emit([doc.value, 1], doc.value);
+                      //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+                        emit(1, 1);
+                        emit(2, 2);
+                        emit(3, 3);
+                        emit(4, 4);
                       if (doc.value === 3) {
-                        emit([doc.value, 1, 5], 1);
+                       //emit([doc.value, 1, 5], 1);
                       }
                   }
                   """},
-                 {"reduce", "_count"}
+                 {"reduce", "_stats"}
                ]}}
            ]}}
        ]}