You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/11 15:01:23 UTC

[couchdb] branch prototype/builtin-reduce created (now 957e271)

This is an automated email from the ASF dual-hosted git repository.

garren pushed a change to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 957e271  rough outline

This branch includes the following new commits:

     new 957e271  rough outline

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: rough outline

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 957e2715711f4dff8ce99f1bfab6a553e32c8c7a
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200

    rough outline
---
 src/couch_views/include/couch_views.hrl            |   4 +
 src/couch_views/src/couch_views.erl                |  14 +-
 src/couch_views/src/couch_views_fdb.erl            | 195 ++++++++++++++++++++-
 .../couch_views_fdb_reduce.erl}                    |  29 +--
 src/couch_views/src/couch_views_indexer.erl        |   8 +-
 src/couch_views/src/couch_views_reader.erl         | 128 +++++++++++++-
 src/couch_views/src/couch_views_reduce.erl         |  70 ++++++++
 .../test/exunit/couch_views_reduce_test.exs        | 165 +++++++++++++++++
 src/couch_views/test/exunit/test_helper.exs        |   2 +
 9 files changed, 592 insertions(+), 23 deletions(-)

diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..6b029f7 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
 -define(VIEW_ID_INFO, 1).
 -define(VIEW_ID_RANGE, 2).
 -define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
 
 -define(VIEW_ROW_COUNT, 0).
 -define(VIEW_KV_SIZE, 1).
@@ -22,5 +23,8 @@
 -define(VIEW_ROW_KEY, 0).
 -define(VIEW_ROW_VALUE, 1).
 
+-define(VIEW_REDUCE_EXACT, 0).
+-define(VIEW_REDUCE_GROUP, 1).
+
 % jobs api
 -define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 7c7588c..b7fe4c4 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
     Args1 = to_mrargs(Args0),
     Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
     Args3 = couch_mrview_util:validate_args(Args2),
+
     ok = check_range(Args3),
-    case is_reduce_view(Args3) of
-        true -> throw({not_implemented});
-        false -> ok
-    end,
 
     ok = maybe_update_view(Db, Mrst, Args3),
 
     try
-        couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args3)
+        case is_reduce_view(Args3) of
+            true ->
+                couch_views_reader:read_reduce(Db, Mrst, ViewName,
+                    Callback, Acc0, Args3);
+            false ->
+                couch_views_reader:read(Db, Mrst, ViewName,
+                    Callback, Acc0, Args3)
+        end
     after
         UpdateAfter = Args3#mrargs.update == lazy,
         if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..d13adc1 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
     get_kv_size/3,
 
     fold_map_idx/6,
+    fold_reduce_idx/6,
 
     write_doc/4
 ]).
@@ -42,6 +43,21 @@
 % View Build Sequence Access
 % (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
 
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+%   = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+%   = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
+% Reduce Range
+%{<db> ?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId, GroupLevel, Key,
+%   ReduceType, RowType} = Value | UnEncodedKey
+% ReduceType = VIEW_REDUCE_EXACT | ?VIEW_REDUCE_GROUP
+% RowType = ?VIEW_KEY | ?VIEW_ROW
+
 
 get_update_seq(TxDb, #mrst{sig = Sig}) ->
     #{
@@ -124,6 +140,81 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     Acc1.
 
 
+fold_reduce_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+
+    FoldAcc = #{
+        prefix => ReduceIdxPrefix,
+        sort_key => undefined,
+        docid => undefined,
+        callback => Callback,
+        acc => Acc0,
+        group_level => undefined,
+        reduce_type => undefined,
+
+        next => key,
+        key => undefined
+    },
+
+    Fun = fun reduce_fold_fwd/2,
+
+    #{
+        acc := Acc1
+    } = fabric2_fdb:fold_range(TxDb, ReduceIdxPrefix, Fun, FoldAcc, Options),
+    Acc1.
+
+
+reduce_fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+    #{
+        prefix := Prefix
+    } = Acc,
+
+    {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_KEY} =
+        erlfdb_tuple:unpack(RowKey, Prefix),
+    Acc#{
+        next := value,
+        key := couch_views_encoding:decode(EncodedOriginalKey),
+        sort_key := SortKey,
+        group_level := GroupLevel,
+        reduce_type := ReduceType
+    };
+
+reduce_fold_fwd({RowKey, EncodedValue}, #{next := value} = Acc) ->
+    #{
+        prefix := Prefix,
+        key := Key,
+        sort_key := SortKey,
+        group_level := GroupLevel,
+        reduce_type := ReduceType,
+        callback := UserCallback,
+        acc := UserAcc0
+    } = Acc,
+
+    % We're asserting there that this row is paired
+    % correctly with the previous row by relying on
+    % a badmatch if any of these values don't match.
+    {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_VALUE} =
+        erlfdb_tuple:unpack(RowKey, Prefix),
+
+    % TODO: Handle more than uint
+    Value = ?bin2uint(EncodedValue),
+    io:format("FWD VAL ~p ~p ~p ~n", [Key, GroupLevel, Value]),
+    UserAcc1 = UserCallback(Key, Value, UserAcc0),
+
+    Acc#{
+        next := key,
+        key := undefined,
+        sort_key := undefined,
+        group_level := undefined,
+        reduce_type := undefined,
+        acc := UserAcc1
+    }.
+
+
 write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     #{
         id := DocId
@@ -134,6 +225,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     clear_id_idx(TxDb, Sig, DocId),
     lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
         clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+        %clear_reduce_idx
         update_row_count(TxDb, Sig, ViewId, -TotalKeys),
         update_kv_size(TxDb, Sig, ViewId, -TotalSize)
     end, ExistingViewKeys);
@@ -141,14 +233,16 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
 write_doc(TxDb, Sig, ViewIds, Doc) ->
     #{
         id := DocId,
-        results := Results
+        results := Results,
+        reduce_results := ReduceResults
     } = Doc,
 
     ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
 
     clear_id_idx(TxDb, Sig, DocId),
 
-    lists:foreach(fun({ViewId, NewRows}) ->
+    %% TODO: handle when there is no reduce
+    lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +259,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 update_kv_size(TxDb, Sig, ViewId, SizeChange),
                 []
         end,
-        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
-    end, lists:zip(ViewIds, Results)).
+        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+        update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+    end, lists:zip3(ViewIds, Results, ReduceResults)).
 
 
 % For each row in a map view there are two rows stored in
@@ -338,6 +433,54 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
     end, KVsToAdd).
 
 
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+%%    Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%%    KeysToRem = ExistingKeys -- Unique,
+%%    lists:foreach(fun(RemKey) ->
+%%        {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%%        ok = erlfdb:clear_range(Tx, Start, End)
+%%    end, KeysToRem),
+%%
+    {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+    ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+%%    add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+    add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+    lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+        KK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+            ReduceType, ?VIEW_ROW_KEY),
+        VK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+            ReduceType, ?VIEW_ROW_VALUE),
+        ok = erlfdb:set(Tx, KK, Key2),
+        ok = erlfdb:add(Tx, VK, Val)
+    end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, GroupLevel, ReduceKey, ReduceType, RowType) ->
+    io:format("GROUP LEVEL ~p ~n", [GroupLevel]),
+    Key = {GroupLevel, ReduceKey, ReduceType, RowType},
+    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%%    Encoded = couch_views_encoding:encode(MapKey, key),
+%%    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%%    erlfdb_tuple:range(Key, DbPrefix).
+
+
 get_view_keys(TxDb, Sig, DocId) ->
     #{
         tx := Tx,
@@ -394,7 +537,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
     erlfdb_tuple:range(Key, DbPrefix).
 
-
 map_idx_prefix(DbPrefix, Sig, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +574,49 @@ process_rows(Rows) ->
     end, [], Grouped).
 
 
+process_reduce_rows(Rows) ->
+    ReduceExact = encode_reduce_rows(Rows),
+    ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+        Out = create_grouping(Key, Val, [], Groupings),
+        io:format("ROW G ~p ~p ~p ~n", [Key, Val, Out]),
+        Out
+    end, #{}, Rows),
+    io:format("INPUT ROWS ~n Groups ~p ~n Exact ~p ~n", [maps:to_list(ReduceGroups), Rows]),
+    ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+    {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+    lists:map(fun({K, V}) ->
+        EK1 = couch_views_encoding:encode(K, key),
+        EK2 = couch_views_encoding:encode(K, value),
+        {EK1, EK2, V, group_level(K)}
+    end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+    length(Key);
+
+group_level(_Key) ->
+    1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+    Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+    Key1 = Key ++ [Head],
+    Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+        OldVal + Val
+        end, Val, Groupings),
+    create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+    maps:update_with(Key, fun(OldVal) ->
+        OldVal + Val
+    end, Val, Groupings).
+
+
 calculate_row_size(Rows) ->
     lists:foldl(fun({K, V}, Acc) ->
         Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_fdb_reduce.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_fdb_reduce.erl
index 2e443eb..d9f2bfd 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_fdb_reduce.erl
@@ -10,17 +10,24 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
 
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_fdb_reduce).
+
+
+-export([
+%%    write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%%    #{
+%%        id := DocId,
+%%        reduce_results := ReduceResults
+%%    } = Doc,
+%%    lists:foreach(fun({ViewId, NewRows}) ->
+%%        % update reduce index
+%%        ok
+%%    end, lists:zip(ViewIds, ReduceResults)).
 
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
 
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 60c8194..09d41e0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -21,6 +21,7 @@
     init/0
 ]).
 
+
 -include("couch_views.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -100,7 +101,8 @@ update(#{} = Db, Mrst0, State0) ->
         } = State2,
 
         {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
-        write_docs(TxDb, Mrst1, MappedDocs, State2),
+        Results = run_reduce(Mrst1, MappedDocs),
+        write_docs(TxDb, Mrst1, Results, State2),
 
         case Count < Limit of
             true ->
@@ -196,6 +198,10 @@ map_docs(Mrst, Docs) ->
     {Mrst1, lists:map(MapFun, Docs)}.
 
 
+run_reduce(Mrst, MappedResults) ->
+    couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
 write_docs(TxDb, Mrst, Docs, State) ->
     #mrst{
         views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..7f1d530 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
 -module(couch_views_reader).
 
 -export([
+    read_reduce/6,
     read/6
 ]).
 
@@ -23,6 +24,114 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+    #mrst{
+        language = Lang,
+        sig = Sig,
+        views = Views
+    } = Mrst,
+
+    ViewId = get_view_id(Lang, Args, ViewName, Views),
+    Fun = fun handle_reduce_row/3,
+
+    try
+        fabric2_fdb:transactional(Db, fun(TxDb) ->
+            Meta = get_meta(TxDb, Mrst, ViewId, Args),
+            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+            Acc0 = #{
+                db => TxDb,
+                skip => Args#mrargs.skip,
+                mrargs => undefined,
+                callback => UserCallback,
+                acc => UserAcc1
+            },
+
+            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+                Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+                KeyAcc1 = KeyAcc0#{
+                    mrargs := KeyArgs
+                },
+                couch_views_fdb:fold_reduce_idx(
+                    TxDb,
+                    Sig,
+                    ViewId,
+                    Opts,
+                    Fun,
+                    KeyAcc1
+                )
+            end, Acc0, expand_keys_args(Args)),
+
+            #{
+                acc := UserAcc2
+            } = Acc1,
+            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+        end)
+    catch throw:{done, Out} ->
+        {ok, Out}
+    end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+    #mrargs{
+%%        start_key = StartKey0,
+%%        start_key_docid = StartKeyDocId,
+%%        end_key = EndKey0,
+%%        end_key_docid = EndKeyDocId,
+        direction = Direction,
+        limit = Limit,
+        skip = Skip,
+        group_level = GroupLevel
+%%        inclusive_end = InclusiveEnd
+    } = Args,
+
+%%    GroupLevelEnd = [{end_key, couch_views_encoding:encode(GroupLevel + 1, key)}],
+    GroupLevelEnd = [{end_key, GroupLevel + 1}],
+
+%%    StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%%        couch_views_encoding:encode(StartKey0, key)
+%%    end,
+%%
+%%    StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%%        {undefined, _} ->
+%%            [];
+%%        {StartKey1, StartKeyDocId} ->
+%%            [{start_key, {StartKey1, StartKeyDocId}}]
+%%    end,
+%%
+%%    EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%%        couch_views_encoding:encode(EndKey0, key)
+%%    end,
+%%
+%%    EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%%        {undefined, _, _} ->
+%%            [];
+%%        {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%%            % When we iterate in reverse with
+%%            % inclusive_end=false we have to set the
+%%            % EndKeyDocId to <<255>> so that we don't
+%%            % include matching rows.
+%%            [{end_key_gt, {EndKey1, <<255>>}}];
+%%        {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%%            % When inclusive_end=false we need to
+%%            % elide the default end_key_docid so as
+%%            % to not sort past the docids with the
+%%            % given end key.
+%%            [{end_key_gt, {EndKey1}}];
+%%        {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%%            [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%%        {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%%            [{end_key, {EndKey1, EndKeyDocId}}]
+%%    end,
+
+    [
+        {dir, Direction},
+        {limit, Limit * 2 + Skip * 2},
+        {streaming_mode, want_all}
+    ] ++ GroupLevelEnd.
+%%    ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
 read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     #mrst{
         language = Lang,
@@ -113,11 +222,28 @@ handle_row(DocId, Key, Value, Acc) ->
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
     Acc#{acc := UserAcc1}.
 
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+    Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+    #{
+        callback := UserCallback,
+        acc := UserAcc0
+    } = Acc,
+
+    Row = [
+        {key, Key},
+        {value, Value}
+    ],
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc#{acc := UserAcc1}.
+
 
 get_view_id(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
         {map, View, _Args} -> View#mrview.id_num;
-        {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+        {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
     end.
 
 
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..9101509
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+    run_reduce/2
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+    ReduceFuns = lists:map(fun(View) ->
+        #mrview{
+            id_num = Id,
+            reduce_funs = ViewReduceFuns
+        } = View,
+
+        [{_, Fun}] = ViewReduceFuns,
+        Fun
+    end, Views),
+
+    lists:map(fun (MappedResult) ->
+        #{
+            results := Results
+        } = MappedResult,
+
+        ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+            reduce(ReduceFun, Result)
+        end, lists:zip(ReduceFuns, Results)),
+
+        MappedResult#{
+            reduce_results => ReduceResults
+        }
+    end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Val} = Acc,
+                Acc#{Key := Val + 1};
+            false ->
+                Acc#{Key => 1}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+    true;
+
+is_builtin(_) ->
+    false.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..5f4242b
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,165 @@
+defmodule CouchViewsReduceTest do
+  use Couch.Test.ExUnit.Case
+
+  alias Couch.Test.Utils
+
+  alias Couch.Test.Setup
+
+  alias Couch.Test.Setup.Step
+
+  setup_all do
+    test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+    on_exit(fn ->
+      :test_util.stop_couch(test_ctx)
+    end)
+  end
+
+  setup do
+    db_name = Utils.random_name("db")
+
+    admin_ctx =
+      {:user_ctx,
+       Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+    {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+    docs = create_docs()
+    ddoc = create_ddoc()
+
+    {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+    on_exit(fn ->
+      :fabric2_db.delete(db_name, [admin_ctx])
+    end)
+
+    %{
+      :db_name => db_name,
+      :db => db,
+      :ddoc => ddoc
+    }
+  end
+
+#  test "group=true count reduce", context do
+#        args = %{
+#            :reduce => true,
+#            :group => true,
+#            :limit => 9
+#        }
+#
+#        {:ok, res} = run_query(context, args)
+#        IO.inspect(res, label: "OUT")
+#
+#        assert res == [
+#                   {:row, [key: 1, value: 2]},
+#                   {:row, [key: 2, value: 2]},
+#                   {:row, [key: 3, value: 2]},
+#                   {:row, [key: [1, 1], value: 1]},
+#                   {:row, [key: [1, 2, 6], value: 1]},
+#                   {:row, [key: [2, 1], value: 1]},
+#                   {:row, [key: [2, 3, 6], value: 1]},
+#                   {:row, [key: [3, 1], value: 1]},
+#                   {:row, [key: [3, 4, 5], value: 1]}
+#               ]
+#    end
+
+  test "group=1 count reduce", context do
+      args = %{
+          :reduce => true,
+          :group_level => 1
+#          :limit => 6
+      }
+
+      {:ok, res} = run_query(context, args)
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: 1, value: 2]},
+                 {:row, [key: 2, value: 2]},
+                 {:row, [key: 3, value: 2]},
+                 {:row, [key: [1], value: 2]},
+                 {:row, [key: [2], value: 2]},
+                 {:row, [key: [3], value: 2]}
+             ]
+  end
+
+  defp run_query(context, args) do
+    db = context[:db]
+    ddoc = context[:ddoc]
+
+    :couch_views.query(db, ddoc, "baz", &__MODULE__.default_cb/2, [], args)
+  end
+
+  def default_cb(:complete, acc) do
+    {:ok, Enum.reverse(acc)}
+  end
+
+  def default_cb({:final, info}, []) do
+    {:ok, [info]}
+  end
+
+  def default_cb({:final, _}, acc) do
+    {:ok, acc}
+  end
+
+  def default_cb({:meta, _}, acc) do
+    {:ok, acc}
+  end
+
+  def default_cb(:ok, :ddoc_updated) do
+    {:ok, :ddoc_updated}
+  end
+
+  def default_cb(row, acc) do
+    {:ok, [row | acc]}
+  end
+
+  defp create_docs() do
+    for i <- 1..3 do
+      group =
+        if rem(i, 3) == 0 do
+          "first"
+        else
+          "second"
+        end
+
+      :couch_doc.from_json_obj(
+        {[
+           {"_id", "doc-id-#{i}"},
+           {"value", i},
+           {"some", "field"},
+           {"group", group}
+         ]}
+      )
+    end
+  end
+
+  defp create_ddoc() do
+    :couch_doc.from_json_obj(
+      {[
+         {"_id", "_design/bar"},
+         {"views",
+          {[
+             {"baz",
+              {[
+                 {"map",
+                  """
+                  function(doc) {
+                    emit(doc.value, doc.value);
+                    emit(doc.value, doc.value);
+                    emit([doc.value, 1], doc.value);
+                    emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+                   }
+                  """},
+                 {"reduce", "_count"}
+               ]}},
+             {"boom",
+              {[
+                 {"map", "function(doc) {emit([doc._id, doc.value], doc.value);}"},
+                 {"reduce", "_count"}
+               ]}}
+           ]}}
+       ]}
+    )
+  end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()