You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:00 UTC
[couchdb] 01/05: Initial work
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 03cbc41c15a3e239e4c5f4e820d19593cc2b8dc4
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200
Initial work
---
src/couch_views/include/couch_views.hrl | 1 +
src/couch_views/src/couch_views.erl | 14 +-
src/couch_views/src/couch_views_fdb.erl | 116 ++++++-
src/couch_views/src/couch_views_indexer.erl | 5 +
src/couch_views/src/couch_views_reader.erl | 155 ++++++++-
src/couch_views/src/couch_views_reduce.erl | 364 +++++++++++++++++++++
.../couch_views_reduce_fdb.erl} | 29 +-
.../test/exunit/couch_views_reduce_test.exs | 286 ++++++++++++++++
src/couch_views/test/exunit/test_helper.exs | 2 +
9 files changed, 950 insertions(+), 22 deletions(-)
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..e97d777 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
-define(VIEW_ID_INFO, 1).
-define(VIEW_ID_RANGE, 2).
-define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
-define(VIEW_ROW_COUNT, 0).
-define(VIEW_KV_SIZE, 1).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 7c7588c..b7fe4c4 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
Args1 = to_mrargs(Args0),
Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
Args3 = couch_mrview_util:validate_args(Args2),
+
ok = check_range(Args3),
- case is_reduce_view(Args3) of
- true -> throw({not_implemented});
- false -> ok
- end,
ok = maybe_update_view(Db, Mrst, Args3),
try
- couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args3)
+ case is_reduce_view(Args3) of
+ true ->
+ couch_views_reader:read_reduce(Db, Mrst, ViewName,
+ Callback, Acc0, Args3);
+ false ->
+ couch_views_reader:read(Db, Mrst, ViewName,
+ Callback, Acc0, Args3)
+ end
after
UpdateAfter = Args3#mrargs.update == lazy,
if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..07241dd 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
get_kv_size/3,
fold_map_idx/6,
+ fold_reduce_idx/6,
write_doc/4
]).
@@ -42,6 +43,15 @@
% View Build Sequence Access
% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+% = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+% = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
get_update_seq(TxDb, #mrst{sig = Sig}) ->
#{
@@ -124,6 +134,8 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
+
+
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -134,6 +146,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
clear_id_idx(TxDb, Sig, DocId),
lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+ %clear_reduce_idx
update_row_count(TxDb, Sig, ViewId, -TotalKeys),
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
@@ -141,14 +154,17 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
write_doc(TxDb, Sig, ViewIds, Doc) ->
#{
id := DocId,
- results := Results
+ results := Results,
+ reduce_results := ReduceResults
} = Doc,
ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
clear_id_idx(TxDb, Sig, DocId),
- lists:foreach(fun({ViewId, NewRows}) ->
+ %% TODO: handle when there is no reduce
+ io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
+ lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +181,11 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
update_kv_size(TxDb, Sig, ViewId, SizeChange),
[]
end,
- update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
- end, lists:zip(ViewIds, Results)).
+ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+ couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ ExistingKeys, ReduceResult),
+ update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+ end, lists:zip3(ViewIds, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
@@ -338,6 +357,53 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
end, KVsToAdd).
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+%% Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%% KeysToRem = ExistingKeys -- Unique,
+%% lists:foreach(fun(RemKey) ->
+%% {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%% ok = erlfdb:clear_range(Tx, Start, End)
+%% end, KeysToRem),
+%%
+ {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+ ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+ lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+ KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+ ReduceType, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+ ReduceType, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, Key2),
+ ok = erlfdb:add(Tx, VK, Val)
+ end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+ Key = {ReduceKey, GroupLevel, ReduceType, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%% Encoded = couch_views_encoding:encode(MapKey, key),
+%% Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%% erlfdb_tuple:range(Key, DbPrefix).
+
+
get_view_keys(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -394,7 +460,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
erlfdb_tuple:range(Key, DbPrefix).
-
map_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +497,47 @@ process_rows(Rows) ->
end, [], Grouped).
+process_reduce_rows(Rows) ->
+ ReduceExact = encode_reduce_rows(Rows),
+ ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+ Out = create_grouping(Key, Val, [], Groupings),
+ Out
+ end, #{}, Rows),
+ ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+ {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+ lists:map(fun({K, V}) ->
+ EK1 = couch_views_encoding:encode(K, key),
+ EK2 = couch_views_encoding:encode(K, value),
+ {EK1, EK2, V, group_level(K)}
+ end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+ length(Key);
+
+group_level(_Key) ->
+ 1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+ Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+ Key1 = Key ++ [Head],
+ Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings),
+ create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+ maps:update_with(Key, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings).
+
+
calculate_row_size(Rows) ->
lists:foldl(fun({K, V}, Acc) ->
Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce063..2f38ac1 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -101,6 +101,7 @@ update(#{} = Db, Mrst0, State0) ->
DocAcc1 = fetch_docs(TxDb, DocAcc),
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
+ Results = run_reduce(Mrst1, MappedDocs),
write_docs(TxDb, Mrst1, MappedDocs, State2),
case Count < Limit of
@@ -209,6 +210,10 @@ map_docs(Mrst, Docs) ->
{Mrst1, MappedDocs}.
+run_reduce(Mrst, MappedResults) ->
+ couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
write_docs(TxDb, Mrst, Docs, State) ->
#mrst{
views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..d08515c 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
-module(couch_views_reader).
-export([
+ read_reduce/6,
read/6
]).
@@ -23,6 +24,128 @@
-include_lib("fabric/include/fabric2.hrl").
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+ #mrst{
+ language = Lang,
+ sig = Sig,
+ views = Views
+ } = Mrst,
+
+ ViewId = get_view_id(Lang, Args, ViewName, Views),
+ couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+ UserAcc0, Args).
+%% Fun = fun handle_reduce_row/3,
+%%
+%% try
+%% fabric2_fdb:transactional(Db, fun(TxDb) ->
+%% Meta = get_meta(TxDb, Mrst, ViewId, Args),
+%% UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+%%
+%% #mrargs{
+%% limit = Limit
+%% } = Args,
+%%
+%% Acc0 = #{
+%% db => TxDb,
+%% skip => Args#mrargs.skip,
+%% mrargs => undefined,
+%% callback => UserCallback,
+%% acc => UserAcc1,
+%% row_count => 0,
+%% limit => Limit
+%% },
+%%
+%% Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+%% Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+%% KeyAcc1 = KeyAcc0#{
+%% mrargs := KeyArgs
+%% },
+%% couch_views_fdb:fold_reduce_idx(
+%% TxDb,
+%% Sig,
+%% ViewId,
+%% Opts,
+%% Fun,
+%% KeyAcc1
+%% )
+%% end, Acc0, expand_keys_args(Args)),
+%%
+%% #{
+%% acc := UserAcc2
+%% } = Acc1,
+%% {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+%% end)
+%% catch throw:{done, Out} ->
+%% {ok, Out}
+%% end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+ #mrargs{
+%% start_key = StartKey0,
+%% start_key_docid = StartKeyDocId,
+%% end_key = EndKey0,
+%% end_key_docid = EndKeyDocId,
+ direction = Direction,
+ limit = Limit,
+ skip = Skip,
+ group_level = GroupLevel,
+ group = Group
+%% inclusive_end = InclusiveEnd
+ } = Args,
+
+ GroupExact = Group == true andalso GroupLevel == 0,
+
+ GroupLevelEnd = case GroupExact of
+ true -> [];
+ false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+ end,
+
+%% StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(StartKey0, key)
+%% end,
+%%
+%% StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%% {undefined, _} ->
+%% [];
+%% {StartKey1, StartKeyDocId} ->
+%% [{start_key, {StartKey1, StartKeyDocId}}]
+%% end,
+%%
+%% EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(EndKey0, key)
+%% end,
+%%
+%% EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%% {undefined, _, _} ->
+%% [];
+%% {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%% % When we iterate in reverse with
+%% % inclusive_end=false we have to set the
+%% % EndKeyDocId to <<255>> so that we don't
+%% % include matching rows.
+%% [{end_key_gt, {EndKey1, <<255>>}}];
+%% {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%% % When inclusive_end=false we need to
+%% % elide the default end_key_docid so as
+%% % to not sort past the docids with the
+%% % given end key.
+%% [{end_key_gt, {EndKey1}}];
+%% {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%% [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%% {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%% [{end_key, {EndKey1, EndKeyDocId}}]
+%% end,
+
+ [
+ {dir, Direction},
+%% {limit, Limit * 2 + Skip * 2},
+ {streaming_mode, large}
+%% {streaming_mode, want_all}
+ ] ++ GroupLevelEnd.
+%% ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
#mrst{
language = Lang,
@@ -113,11 +236,41 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+ io:format("ACC ~p ~n", [Acc]),
+ #{
+ callback := UserCallback,
+ acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
+ } = Acc,
+
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
+
+ RowCountNext = RowCount + 1,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
+
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
{map, View, _Args} -> View#mrview.id_num;
- {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+ {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..1502f38
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,364 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+ run_reduce/2,
+ update_reduce_idx/6,
+ read_reduce/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+-define(LEVEL_FAN_POW, 4).
+-define(MAX_SKIP_LIST_LEVELS, 6).
+
+
+log_levels(Db, Sig, ViewId) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Opts = [{streaming_mode, want_all}],
+
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ lists:foreach(fun (Level) ->
+ {StartKey, EndKey} = erlfdb_tuple:range({Level},
+ ReduceIdxPrefix),
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ next => key,
+ key => undefined,
+ rows => []
+ },
+
+ Fun = fun fold_fwd_cb/2,
+ Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ rows := Rows
+ } = Acc,
+ io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+ {ok, Rows}
+ end, Levels),
+ {ok, []}
+ end).
+
+
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ args => Args,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ next => key,
+ rows => []
+ },
+
+
+%% Opts = [{limit, 2}, {streaming_mode, want_all}],
+%% EK = couch_views_encoding:encode(0, key),
+%% {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
+%% ReduceIdxPrefix),
+%%
+%% Fun = fun fold_fwd_cb/2,
+%% Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ rows := Rows
+ } = Acc0,
+ {ok, Rows}
+ end).
+
+args_to_fdb_opts(#mrargs{} = Args) ->
+ #mrargs{
+ limit = Limit,
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+ ok.
+
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix
+ } = Acc,
+
+ {Level, EK, ?VIEW_ROW_KEY}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%% Key = couch_views_encoding:decode(EV),
+ Val = couch_views_encoding:decode(EV),
+ Acc#{next := value, key := Val};
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix,
+ rows := Rows,
+ key := Key
+ } = Acc,
+
+ {Level, EK, ?VIEW_ROW_VALUE}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%% Key = couch_views_encoding:decode(EV),
+ Val = couch_views_encoding:decode(EV),
+ Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+ ReduceFuns = lists:map(fun(View) ->
+ #mrview{
+ id_num = Id,
+ reduce_funs = ViewReduceFuns
+ } = View,
+
+ [{_, Fun}] = ViewReduceFuns,
+ Fun
+ end, Views),
+
+ lists:map(fun (MappedResult) ->
+ #{
+ results := Results
+ } = MappedResult,
+
+ ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+ reduce(ReduceFun, Result)
+ end, lists:zip(ReduceFuns, Results)),
+
+ MappedResult#{
+ reduce_results => ReduceResults
+ }
+ end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Val} = Acc,
+ Acc#{Key := Val + 1};
+ false ->
+ Acc#{Key => 1}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ io:format("MAX ~p ~p ~n", [Key, Val]),
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Max} = Acc,
+ case Max >= Val of
+ true ->
+ Acc;
+ false ->
+ Acc#{Key := Val}
+ end;
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+ true;
+
+is_builtin(_) ->
+ false.
+
+
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+ lists:foreach(fun(Level) ->
+ add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+ end, Levels)
+ end).
+
+%% This sucks but its simple for now
+should_add_key_to_level(0, _, _) ->
+ true;
+
+should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
+ false;
+
+should_add_key_to_level(_, _, false) ->
+ false;
+
+should_add_key_to_level(_, _Key, _Prev) ->
+ crypto:rand_uniform(0, 2) == 0.
+
+%%should_add_key_to_level(Level, Key) ->
+%% erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foldl(fun(Level, PrevCoinFlip) ->
+ io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
+ {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+ io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
+ case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+ true ->
+ io:format("Adding ~p ~p ~n", [Level, Key]),
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
+ true;
+ false ->
+ {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
+ false
+ end
+ end, true, Levels)
+ end).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+ case PrevVal >= Val of
+ true -> {PrevKey, PrevVal};
+ false -> {PrevKey, Val}
+ end.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
+ Key = {SkipLevel, ReduceKey, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ EK = couch_views_encoding:encode(Key, key),
+ EVK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+
+ KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, EVK),
+ ok = erlfdb:set(Tx, VK, EV).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ % TODO: see if we need to add in conflict ranges for this for level=0
+ Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
+%% LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
+
+ EK = couch_views_encoding:encode(Key, key),
+ EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+
+ {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
+%% EndKey1 = erlfdb_key:first_greater_than(EndKey0),
+
+ Callback = fun row_cb/2,
+ Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
+ io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
+ Out.
+
+
+row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
+ io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+ {_Level, EK, _VIEW_ROW_VALUE}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ Val = couch_views_encoding:decode(EV),
+%% io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
+
+ {key, {EK, ReduceIdxPrefix, Val}};
+
+row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
+ io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+ {_Level, EK, ?VIEW_ROW_KEY}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ Key = couch_views_encoding:decode(EVK),
+
+ {Key, Val}.
+
+
+
+
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_reduce_fdb.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_reduce_fdb.erl
index 2e443eb..bcaaa30 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -10,17 +10,24 @@
% License for the specific language governing permissions and limitations under
% the License.
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_reduce_fdb).
+
+
+-export([
+%% write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%% #{
+%% id := DocId,
+%% reduce_results := ReduceResults
+%% } = Doc,
+%% lists:foreach(fun({ViewId, NewRows}) ->
+%% % update reduce index
+%% ok
+%% end, lists:zip(ViewIds, ReduceResults)).
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..3f7a173
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,286 @@
+defmodule CouchViewsReduceTest do
+ use Couch.Test.ExUnit.Case
+
+ alias Couch.Test.Utils
+
+ alias Couch.Test.Setup
+
+ alias Couch.Test.Setup.Step
+
+ setup_all do
+ test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+ on_exit(fn ->
+ :test_util.stop_couch(test_ctx)
+ end)
+ end
+
+ setup do
+ db_name = Utils.random_name("db")
+
+ admin_ctx =
+ {:user_ctx,
+ Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+ {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+ docs = create_docs()
+ ddoc = create_ddoc()
+
+ {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+ on_exit(fn ->
+ :fabric2_db.delete(db_name, [admin_ctx])
+ end)
+
+ %{
+ :db_name => db_name,
+ :db => db,
+ :ddoc => ddoc
+ }
+ end
+
+ # test "group=true count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true
+ # # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 1]},
+ # {:row, [key: [1, 1, 5], value: 1]},
+ # {:row, [key: [1, 2, 6], value: 1]},
+ # {:row, [key: [2, 1], value: 1]},
+ # {:row, [key: [2, 3, 6], value: 1]},
+ # {:row, [key: [3, 1], value: 1]},
+ # {:row, [key: [3, 1, 5], value: 1]},
+ # {:row, [key: [3, 4, 5], value: 1]}
+ # ]
+ # end
+
+ # test "group=1 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 1
+ # # :limit => 6
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1], value: 2]},
+ # {:row, [key: [2], value: 2]},
+ # {:row, [key: [3], value: 2]}
+ # ]
+ # end
+ #
+ # test "group=2 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 2,
+ # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 2]},
+ # {:row, [key: [1, 2], value: 1]},
+ # {:row, [key: [2, 1], value: 1]},
+ # {:row, [key: [2, 3], value: 1]},
+ # {:row, [key: [3, 1], value: 2]},
+ # {:row, [key: [3, 4], value: 1]}
+ # ]
+ # end
+ #
+ # test "group=2 count reduce with limit = 3", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 2,
+ # :limit => 4
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 1]}
+ # ]
+ # end
+ #
+ # # [
+ # # row: [key: [2019, 1, 2], value: 1],
+ # # row: [key: [2019, 1, 4], value: 1],
+ # # row: [key: [2019, 2, 1], value: 1],
+ # # row: [key: [2019, 2, 3], value: 1]
+ # # ]
+ #
+ # test "group=2 count reduce with startkey", context do
+ # args = %{
+ # # :reduce => true,
+ # # :group_level => 2,
+ # :start_key => [2019, 1, 4]
+ # # :limit => 4
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "boom")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2019, 1], value: 1]},
+ # {:row, [key: [2019, 2], value: 2]}
+ # ]
+ # end
+
+ test "group_level=0 _max reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 0
+ # :limit => 9
+ }
+
+ {:ok, res} = run_query(context, args, "max")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: :null, value: 3]}
+ ]
+ end
+
+ defp run_query(context, args, view) do
+ db = context[:db]
+ ddoc = context[:ddoc]
+
+ :couch_views.query(db, ddoc, view, &__MODULE__.default_cb/2, [], args)
+ end
+
+ def default_cb(:complete, acc) do
+ {:ok, Enum.reverse(acc)}
+ end
+
+ def default_cb({:final, info}, []) do
+ {:ok, [info]}
+ end
+
+ def default_cb({:final, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb({:meta, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb(:ok, :ddoc_updated) do
+ {:ok, :ddoc_updated}
+ end
+
+ def default_cb(row, acc) do
+ {:ok, [row | acc]}
+ end
+
+ defp create_docs() do
+ for i <- 1..1 do
+ group =
+ if rem(i, 3) == 0 do
+ "first"
+ else
+ "second"
+ end
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group}
+ ]}
+ )
+ end
+ end
+
+ defp create_ddoc() do
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "_design/bar"},
+ {"views",
+ {[
+# {"baz",
+# {[
+# {"map",
+# """
+# function(doc) {
+# emit(doc.value, doc.value);
+# emit(doc.value, doc.value);
+# emit([doc.value, 1], doc.value);
+# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+# if (doc.value === 3) {
+# emit([1, 1, 5], 1);
+# emit([doc.value, 1, 5], 1);
+# }
+# }
+# """},
+# {"reduce", "_count"}
+# ]}},
+# {"boom",
+# {[
+# {"map",
+# """
+# function(doc) {
+# var month = 1;
+# if (doc.value % 2) {
+# month = 2;
+# }
+# emit([2019, month, doc.value], doc.value);
+# }
+# """},
+# {"reduce", "_count"}
+# ]}},
+ {"max",
+ {[
+ {"map",
+ """
+ function(doc) {
+ //emit(doc.value, doc.value);
+ //emit([doc.value, 1], doc.value);
+ //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ emit(1, 1);
+ emit(2, 2);
+ emit(3, 3);
+ emit(4, 4);
+
+ emit([2019, 2, 2], 1);
+ emit([2019, 3, 3], 2);
+ emit([2019, 3, 3], 3);
+ emit([2019, 4, 3], 4);
+ emit([2019, 5, 3], 6);
+ if (doc.value === 3) {
+ //emit([doc.value, 1, 5], 1);
+ }
+ }
+ """},
+ {"reduce", "_stats"}
+ ]}}
+ ]}}
+ ]}
+ )
+ end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()