You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/11 15:01:24 UTC
[couchdb] 01/01: rough outline
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 957e2715711f4dff8ce99f1bfab6a553e32c8c7a
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200
rough outline
---
src/couch_views/include/couch_views.hrl | 4 +
src/couch_views/src/couch_views.erl | 14 +-
src/couch_views/src/couch_views_fdb.erl | 195 ++++++++++++++++++++-
.../couch_views_fdb_reduce.erl} | 29 +--
src/couch_views/src/couch_views_indexer.erl | 8 +-
src/couch_views/src/couch_views_reader.erl | 128 +++++++++++++-
src/couch_views/src/couch_views_reduce.erl | 70 ++++++++
.../test/exunit/couch_views_reduce_test.exs | 165 +++++++++++++++++
src/couch_views/test/exunit/test_helper.exs | 2 +
9 files changed, 592 insertions(+), 23 deletions(-)
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..6b029f7 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
-define(VIEW_ID_INFO, 1).
-define(VIEW_ID_RANGE, 2).
-define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
-define(VIEW_ROW_COUNT, 0).
-define(VIEW_KV_SIZE, 1).
@@ -22,5 +23,8 @@
-define(VIEW_ROW_KEY, 0).
-define(VIEW_ROW_VALUE, 1).
+-define(VIEW_REDUCE_EXACT, 0).
+-define(VIEW_REDUCE_GROUP, 1).
+
% jobs api
-define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 7c7588c..b7fe4c4 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
Args1 = to_mrargs(Args0),
Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
Args3 = couch_mrview_util:validate_args(Args2),
+
ok = check_range(Args3),
- case is_reduce_view(Args3) of
- true -> throw({not_implemented});
- false -> ok
- end,
ok = maybe_update_view(Db, Mrst, Args3),
try
- couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args3)
+ case is_reduce_view(Args3) of
+ true ->
+ couch_views_reader:read_reduce(Db, Mrst, ViewName,
+ Callback, Acc0, Args3);
+ false ->
+ couch_views_reader:read(Db, Mrst, ViewName,
+ Callback, Acc0, Args3)
+ end
after
UpdateAfter = Args3#mrargs.update == lazy,
if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..d13adc1 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
get_kv_size/3,
fold_map_idx/6,
+ fold_reduce_idx/6,
write_doc/4
]).
@@ -42,6 +43,21 @@
% View Build Sequence Access
% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+% = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+% = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
+% Reduce Range
+%{<db> ?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId, GroupLevel, Key,
+% ReduceType, RowType} = Value | UnEncodedKey
+% ReduceType = VIEW_REDUCE_EXACT | ?VIEW_REDUCE_GROUP
+% RowType = ?VIEW_KEY | ?VIEW_ROW
+
get_update_seq(TxDb, #mrst{sig = Sig}) ->
#{
@@ -124,6 +140,81 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
+fold_reduce_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+
+ FoldAcc = #{
+ prefix => ReduceIdxPrefix,
+ sort_key => undefined,
+ docid => undefined,
+ callback => Callback,
+ acc => Acc0,
+ group_level => undefined,
+ reduce_type => undefined,
+
+ next => key,
+ key => undefined
+ },
+
+ Fun = fun reduce_fold_fwd/2,
+
+ #{
+ acc := Acc1
+ } = fabric2_fdb:fold_range(TxDb, ReduceIdxPrefix, Fun, FoldAcc, Options),
+ Acc1.
+
+
+reduce_fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+ #{
+ prefix := Prefix
+ } = Acc,
+
+ {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_KEY} =
+ erlfdb_tuple:unpack(RowKey, Prefix),
+ Acc#{
+ next := value,
+ key := couch_views_encoding:decode(EncodedOriginalKey),
+ sort_key := SortKey,
+ group_level := GroupLevel,
+ reduce_type := ReduceType
+ };
+
+reduce_fold_fwd({RowKey, EncodedValue}, #{next := value} = Acc) ->
+ #{
+ prefix := Prefix,
+ key := Key,
+ sort_key := SortKey,
+ group_level := GroupLevel,
+ reduce_type := ReduceType,
+ callback := UserCallback,
+ acc := UserAcc0
+ } = Acc,
+
+ % We're asserting there that this row is paired
+ % correctly with the previous row by relying on
+ % a badmatch if any of these values don't match.
+ {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_VALUE} =
+ erlfdb_tuple:unpack(RowKey, Prefix),
+
+ % TODO: Handle more than uint
+ Value = ?bin2uint(EncodedValue),
+ io:format("FWD VAL ~p ~p ~p ~n", [Key, GroupLevel, Value]),
+ UserAcc1 = UserCallback(Key, Value, UserAcc0),
+
+ Acc#{
+ next := key,
+ key := undefined,
+ sort_key := undefined,
+ group_level := undefined,
+ reduce_type := undefined,
+ acc := UserAcc1
+ }.
+
+
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -134,6 +225,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
clear_id_idx(TxDb, Sig, DocId),
lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+ %clear_reduce_idx
update_row_count(TxDb, Sig, ViewId, -TotalKeys),
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
@@ -141,14 +233,16 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
write_doc(TxDb, Sig, ViewIds, Doc) ->
#{
id := DocId,
- results := Results
+ results := Results,
+ reduce_results := ReduceResults
} = Doc,
ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
clear_id_idx(TxDb, Sig, DocId),
- lists:foreach(fun({ViewId, NewRows}) ->
+ %% TODO: handle when there is no reduce
+ lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +259,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
update_kv_size(TxDb, Sig, ViewId, SizeChange),
[]
end,
- update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
- end, lists:zip(ViewIds, Results)).
+ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+ update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+ end, lists:zip3(ViewIds, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
@@ -338,6 +433,54 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
end, KVsToAdd).
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+%% Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%% KeysToRem = ExistingKeys -- Unique,
+%% lists:foreach(fun(RemKey) ->
+%% {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%% ok = erlfdb:clear_range(Tx, Start, End)
+%% end, KeysToRem),
+%%
+ {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+ ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+%% add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+ lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+ KK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+ ReduceType, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+ ReduceType, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, Key2),
+ ok = erlfdb:add(Tx, VK, Val)
+ end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, GroupLevel, ReduceKey, ReduceType, RowType) ->
+ io:format("GROUP LEVEL ~p ~n", [GroupLevel]),
+ Key = {GroupLevel, ReduceKey, ReduceType, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%% Encoded = couch_views_encoding:encode(MapKey, key),
+%% Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%% erlfdb_tuple:range(Key, DbPrefix).
+
+
get_view_keys(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -394,7 +537,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
erlfdb_tuple:range(Key, DbPrefix).
-
map_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +574,49 @@ process_rows(Rows) ->
end, [], Grouped).
+process_reduce_rows(Rows) ->
+ ReduceExact = encode_reduce_rows(Rows),
+ ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+ Out = create_grouping(Key, Val, [], Groupings),
+ io:format("ROW G ~p ~p ~p ~n", [Key, Val, Out]),
+ Out
+ end, #{}, Rows),
+ io:format("INPUT ROWS ~n Groups ~p ~n Exact ~p ~n", [maps:to_list(ReduceGroups), Rows]),
+ ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+ {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+ lists:map(fun({K, V}) ->
+ EK1 = couch_views_encoding:encode(K, key),
+ EK2 = couch_views_encoding:encode(K, value),
+ {EK1, EK2, V, group_level(K)}
+ end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+ length(Key);
+
+group_level(_Key) ->
+ 1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+ Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+ Key1 = Key ++ [Head],
+ Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings),
+ create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+ maps:update_with(Key, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings).
+
+
calculate_row_size(Rows) ->
lists:foldl(fun({K, V}, Acc) ->
Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_fdb_reduce.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_fdb_reduce.erl
index 2e443eb..d9f2bfd 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_fdb_reduce.erl
@@ -10,17 +10,24 @@
% License for the specific language governing permissions and limitations under
% the License.
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_fdb_reduce).
+
+
+-export([
+%% write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%% #{
+%% id := DocId,
+%% reduce_results := ReduceResults
+%% } = Doc,
+%% lists:foreach(fun({ViewId, NewRows}) ->
+%% % update reduce index
+%% ok
+%% end, lists:zip(ViewIds, ReduceResults)).
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 60c8194..09d41e0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -21,6 +21,7 @@
init/0
]).
+
-include("couch_views.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -100,7 +101,8 @@ update(#{} = Db, Mrst0, State0) ->
} = State2,
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
- write_docs(TxDb, Mrst1, MappedDocs, State2),
+ Results = run_reduce(Mrst1, MappedDocs),
+ write_docs(TxDb, Mrst1, Results, State2),
case Count < Limit of
true ->
@@ -196,6 +198,10 @@ map_docs(Mrst, Docs) ->
{Mrst1, lists:map(MapFun, Docs)}.
+run_reduce(Mrst, MappedResults) ->
+ couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
write_docs(TxDb, Mrst, Docs, State) ->
#mrst{
views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..7f1d530 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
-module(couch_views_reader).
-export([
+ read_reduce/6,
read/6
]).
@@ -23,6 +24,114 @@
-include_lib("fabric/include/fabric2.hrl").
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+ #mrst{
+ language = Lang,
+ sig = Sig,
+ views = Views
+ } = Mrst,
+
+ ViewId = get_view_id(Lang, Args, ViewName, Views),
+ Fun = fun handle_reduce_row/3,
+
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Meta = get_meta(TxDb, Mrst, ViewId, Args),
+ UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+ Acc0 = #{
+ db => TxDb,
+ skip => Args#mrargs.skip,
+ mrargs => undefined,
+ callback => UserCallback,
+ acc => UserAcc1
+ },
+
+ Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+ Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+ KeyAcc1 = KeyAcc0#{
+ mrargs := KeyArgs
+ },
+ couch_views_fdb:fold_reduce_idx(
+ TxDb,
+ Sig,
+ ViewId,
+ Opts,
+ Fun,
+ KeyAcc1
+ )
+ end, Acc0, expand_keys_args(Args)),
+
+ #{
+ acc := UserAcc2
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+ end)
+ catch throw:{done, Out} ->
+ {ok, Out}
+ end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+ #mrargs{
+%% start_key = StartKey0,
+%% start_key_docid = StartKeyDocId,
+%% end_key = EndKey0,
+%% end_key_docid = EndKeyDocId,
+ direction = Direction,
+ limit = Limit,
+ skip = Skip,
+ group_level = GroupLevel
+%% inclusive_end = InclusiveEnd
+ } = Args,
+
+%% GroupLevelEnd = [{end_key, couch_views_encoding:encode(GroupLevel + 1, key)}],
+ GroupLevelEnd = [{end_key, GroupLevel + 1}],
+
+%% StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(StartKey0, key)
+%% end,
+%%
+%% StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%% {undefined, _} ->
+%% [];
+%% {StartKey1, StartKeyDocId} ->
+%% [{start_key, {StartKey1, StartKeyDocId}}]
+%% end,
+%%
+%% EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(EndKey0, key)
+%% end,
+%%
+%% EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%% {undefined, _, _} ->
+%% [];
+%% {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%% % When we iterate in reverse with
+%% % inclusive_end=false we have to set the
+%% % EndKeyDocId to <<255>> so that we don't
+%% % include matching rows.
+%% [{end_key_gt, {EndKey1, <<255>>}}];
+%% {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%% % When inclusive_end=false we need to
+%% % elide the default end_key_docid so as
+%% % to not sort past the docids with the
+%% % given end key.
+%% [{end_key_gt, {EndKey1}}];
+%% {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%% [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%% {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%% [{end_key, {EndKey1, EndKeyDocId}}]
+%% end,
+
+ [
+ {dir, Direction},
+ {limit, Limit * 2 + Skip * 2},
+ {streaming_mode, want_all}
+ ] ++ GroupLevelEnd.
+%% ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
#mrst{
language = Lang,
@@ -113,11 +222,28 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+ #{
+ callback := UserCallback,
+ acc := UserAcc0
+ } = Acc,
+
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc#{acc := UserAcc1}.
+
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
{map, View, _Args} -> View#mrview.id_num;
- {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+ {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..9101509
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+ run_reduce/2
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+ ReduceFuns = lists:map(fun(View) ->
+ #mrview{
+ id_num = Id,
+ reduce_funs = ViewReduceFuns
+ } = View,
+
+ [{_, Fun}] = ViewReduceFuns,
+ Fun
+ end, Views),
+
+ lists:map(fun (MappedResult) ->
+ #{
+ results := Results
+ } = MappedResult,
+
+ ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+ reduce(ReduceFun, Result)
+ end, lists:zip(ReduceFuns, Results)),
+
+ MappedResult#{
+ reduce_results => ReduceResults
+ }
+ end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Val} = Acc,
+ Acc#{Key := Val + 1};
+ false ->
+ Acc#{Key => 1}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+ true;
+
+is_builtin(_) ->
+ false.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..5f4242b
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,165 @@
+defmodule CouchViewsReduceTest do
+ use Couch.Test.ExUnit.Case
+
+ alias Couch.Test.Utils
+
+ alias Couch.Test.Setup
+
+ alias Couch.Test.Setup.Step
+
+ setup_all do
+ test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+ on_exit(fn ->
+ :test_util.stop_couch(test_ctx)
+ end)
+ end
+
+ setup do
+ db_name = Utils.random_name("db")
+
+ admin_ctx =
+ {:user_ctx,
+ Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+ {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+ docs = create_docs()
+ ddoc = create_ddoc()
+
+ {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+ on_exit(fn ->
+ :fabric2_db.delete(db_name, [admin_ctx])
+ end)
+
+ %{
+ :db_name => db_name,
+ :db => db,
+ :ddoc => ddoc
+ }
+ end
+
+# test "group=true count reduce", context do
+# args = %{
+# :reduce => true,
+# :group => true,
+# :limit => 9
+# }
+#
+# {:ok, res} = run_query(context, args)
+# IO.inspect(res, label: "OUT")
+#
+# assert res == [
+# {:row, [key: 1, value: 2]},
+# {:row, [key: 2, value: 2]},
+# {:row, [key: 3, value: 2]},
+# {:row, [key: [1, 1], value: 1]},
+# {:row, [key: [1, 2, 6], value: 1]},
+# {:row, [key: [2, 1], value: 1]},
+# {:row, [key: [2, 3, 6], value: 1]},
+# {:row, [key: [3, 1], value: 1]},
+# {:row, [key: [3, 4, 5], value: 1]}
+# ]
+# end
+
+ test "group=1 count reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 1
+# :limit => 6
+ }
+
+ {:ok, res} = run_query(context, args)
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1], value: 2]},
+ {:row, [key: [2], value: 2]},
+ {:row, [key: [3], value: 2]}
+ ]
+ end
+
+ defp run_query(context, args) do
+ db = context[:db]
+ ddoc = context[:ddoc]
+
+ :couch_views.query(db, ddoc, "baz", &__MODULE__.default_cb/2, [], args)
+ end
+
+ def default_cb(:complete, acc) do
+ {:ok, Enum.reverse(acc)}
+ end
+
+ def default_cb({:final, info}, []) do
+ {:ok, [info]}
+ end
+
+ def default_cb({:final, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb({:meta, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb(:ok, :ddoc_updated) do
+ {:ok, :ddoc_updated}
+ end
+
+ def default_cb(row, acc) do
+ {:ok, [row | acc]}
+ end
+
+ defp create_docs() do
+ for i <- 1..3 do
+ group =
+ if rem(i, 3) == 0 do
+ "first"
+ else
+ "second"
+ end
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group}
+ ]}
+ )
+ end
+ end
+
+ defp create_ddoc() do
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "_design/bar"},
+ {"views",
+ {[
+ {"baz",
+ {[
+ {"map",
+ """
+ function(doc) {
+ emit(doc.value, doc.value);
+ emit(doc.value, doc.value);
+ emit([doc.value, 1], doc.value);
+ emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ }
+ """},
+ {"reduce", "_count"}
+ ]}},
+ {"boom",
+ {[
+ {"map", "function(doc) {emit([doc._id, doc.value], doc.value);}"},
+ {"reduce", "_count"}
+ ]}}
+ ]}}
+ ]}
+ )
+ end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()