You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/12 15:34:05 UTC
[couchdb] branch prototype/builtin-reduce updated: group=true and
group_levels working
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/prototype/builtin-reduce by this push:
new 2cd1db7 group=true and group_levels working
2cd1db7 is described below
commit 2cd1db767a88f02de4595f64780c4d37f8f98456
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Sep 12 17:33:52 2019 +0200
group=true and group_levels working
---
src/couch_views/src/couch_views_fdb.erl | 88 ++++++++++++++++++----
src/couch_views/src/couch_views_reader.erl | 39 ++++++++--
.../test/exunit/couch_views_reduce_test.exs | 83 ++++++++++++++------
3 files changed, 166 insertions(+), 44 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index d13adc1..7bfcac0 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -53,7 +53,7 @@
% Type = ?VIEW_KEY | ?VIEW_ROW
% Reduce Range
-%{<db> ?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId, GroupLevel, Key,
+%{<db> ?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId, Key, GroupLevel,
% ReduceType, RowType} = Value | UnEncodedKey
% ReduceType = VIEW_REDUCE_EXACT | ?VIEW_REDUCE_GROUP
% RowType = ?VIEW_KEY | ?VIEW_ROW
@@ -145,15 +145,31 @@ fold_reduce_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
db_prefix := DbPrefix
} = TxDb,
+ #{
+ mrargs := MrArgs
+ } = Acc0,
+
+ #mrargs{
+ group_level = GroupLevel,
+ group = Group
+ } = MrArgs,
+
ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+ Group1 = case {GroupLevel, Group} of
+ {0, true} -> exact;
+ _ -> grouping
+ end,
+
FoldAcc = #{
prefix => ReduceIdxPrefix,
sort_key => undefined,
docid => undefined,
callback => Callback,
acc => Acc0,
- group_level => undefined,
+ group_level => GroupLevel,
+ group => Group1,
+ prev_exact_key => undefined,
reduce_type => undefined,
next => key,
@@ -173,13 +189,12 @@ reduce_fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
prefix := Prefix
} = Acc,
- {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_KEY} =
+ {SortKey, _RowGroupLevel, ReduceType, ?VIEW_ROW_KEY} =
erlfdb_tuple:unpack(RowKey, Prefix),
Acc#{
next := value,
key := couch_views_encoding:decode(EncodedOriginalKey),
sort_key := SortKey,
- group_level := GroupLevel,
reduce_type := ReduceType
};
@@ -189,32 +204,74 @@ reduce_fold_fwd({RowKey, EncodedValue}, #{next := value} = Acc) ->
key := Key,
sort_key := SortKey,
group_level := GroupLevel,
+ group := Group,
reduce_type := ReduceType,
callback := UserCallback,
- acc := UserAcc0
+ acc := UserAcc0,
+ prev_exact_key := PrevExactKey
} = Acc,
+
% We're asserting there that this row is paired
% correctly with the previous row by relying on
% a badmatch if any of these values don't match.
- {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_VALUE} =
+ {SortKey, RowGroupLevel, ReduceType, ?VIEW_ROW_VALUE} =
erlfdb_tuple:unpack(RowKey, Prefix),
% TODO: Handle more than uint
Value = ?bin2uint(EncodedValue),
- io:format("FWD VAL ~p ~p ~p ~n", [Key, GroupLevel, Value]),
- UserAcc1 = UserCallback(Key, Value, UserAcc0),
+ io:format("FWD VAL ~p ~p ~p ~p ~n", [Key, RowGroupLevel, Value, ReduceType]),
+ io:format("GROUP SETTINGS ~p ~p ~n", [Group, GroupLevel]),
+ UserAcc1 = case should_return_row(PrevExactKey, Key, Group, GroupLevel, RowGroupLevel, ReduceType) of
+ true ->
+ UserCallback(Key, Value, UserAcc0);
+ false ->
+ UserAcc0
+ end,
+
+ PrevExactKey1 = maybe_update_prev_exact_key(PrevExactKey, Key, ReduceType),
Acc#{
next := key,
key := undefined,
sort_key := undefined,
- group_level := undefined,
reduce_type := undefined,
- acc := UserAcc1
+ acc := UserAcc1,
+ prev_exact_key := PrevExactKey1
}.
+should_return_row(_PrevExactKey, _CurrentKey, exact, _GroupLevel,
+ _RowGroupLevel, ?VIEW_REDUCE_EXACT) ->
+ true;
+
+should_return_row(_PrevExactKey, _CurrentKey, exact, _GroupLevel,
+ _RowGroupLevel, ?VIEW_REDUCE_GROUP) ->
+ false;
+
+should_return_row(_PrevExactKey, _CurrentKey, _Group, GroupLevel,
+ RowGroupLevel, ?VIEW_REDUCE_EXACT) when RowGroupLevel =< GroupLevel ->
+ true;
+
+should_return_row(PrevExactKey, PrevExactKey, _Group, GroupLevel, GroupLevel,
+ ?VIEW_REDUCE_GROUP) ->
+ false;
+
+should_return_row(_PrevExactKey, _CurrentKey, _Group, GroupLevel, GroupLevel,
+ ?VIEW_REDUCE_GROUP) ->
+ true;
+
+should_return_row(_, _, _, _, _, _) ->
+ false.
+
+
+maybe_update_prev_exact_key(PrevExactKey, _NewKey, ?VIEW_REDUCE_GROUP) ->
+ PrevExactKey;
+
+maybe_update_prev_exact_key(_PrevExactKey, NewKey, ?VIEW_REDUCE_EXACT) ->
+ NewKey.
+
+
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -449,15 +506,15 @@ update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
%%
{ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
-%% add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
- KK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+ KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
ReduceType, ?VIEW_ROW_KEY),
- VK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+ VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
ReduceType, ?VIEW_ROW_VALUE),
ok = erlfdb:set(Tx, KK, Key2),
ok = erlfdb:add(Tx, VK, Val)
@@ -469,9 +526,8 @@ reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
erlfdb_tuple:pack(Key, DbPrefix).
-reduce_idx_key(ReduceIdxPrefix, GroupLevel, ReduceKey, ReduceType, RowType) ->
- io:format("GROUP LEVEL ~p ~n", [GroupLevel]),
- Key = {GroupLevel, ReduceKey, ReduceType, RowType},
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+ Key = {ReduceKey, GroupLevel, ReduceType, RowType},
erlfdb_tuple:pack(Key, ReduceIdxPrefix).
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 7f1d530..59c648f 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -39,12 +39,18 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
Meta = get_meta(TxDb, Mrst, ViewId, Args),
UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+ #mrargs{
+ limit = Limit
+ } = Args,
+
Acc0 = #{
db => TxDb,
skip => Args#mrargs.skip,
mrargs => undefined,
callback => UserCallback,
- acc => UserAcc1
+ acc => UserAcc1,
+ row_count => 0,
+ limit => Limit
},
Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
@@ -81,12 +87,17 @@ reduce_mrargs_to_fdb_options(Args) ->
direction = Direction,
limit = Limit,
skip = Skip,
- group_level = GroupLevel
+ group_level = GroupLevel,
+ group = Group
%% inclusive_end = InclusiveEnd
} = Args,
-%% GroupLevelEnd = [{end_key, couch_views_encoding:encode(GroupLevel + 1, key)}],
- GroupLevelEnd = [{end_key, GroupLevel + 1}],
+ GroupExact = Group == true andalso GroupLevel == 0,
+
+ GroupLevelEnd = case GroupExact of
+ true -> [];
+ false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+ end,
%% StartKey1 = if StartKey0 == undefined -> undefined; true ->
%% couch_views_encoding:encode(StartKey0, key)
@@ -126,7 +137,8 @@ reduce_mrargs_to_fdb_options(Args) ->
[
{dir, Direction},
- {limit, Limit * 2 + Skip * 2},
+%% {limit, Limit * 2 + Skip * 2},
+%% {streaming_mode, stream_large}
{streaming_mode, want_all}
] ++ GroupLevelEnd.
%% ] ++ StartKeyOpts ++ EndKeyOpts.
@@ -226,9 +238,12 @@ handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
Acc#{skip := Skip - 1};
handle_reduce_row(Key, Value, Acc) ->
+ io:format("ACC ~p ~n", [Acc]),
#{
callback := UserCallback,
- acc := UserAcc0
+ acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
} = Acc,
Row = [
@@ -236,8 +251,18 @@ handle_reduce_row(Key, Value, Acc) ->
{value, Value}
],
+ RowCountNext = RowCount + 1,
+
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
- Acc#{acc := UserAcc1}.
+ Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
get_view_id(Lang, Args, ViewName, Views) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 5f4242b..48887d0 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,28 +40,28 @@ defmodule CouchViewsReduceTest do
}
end
-# test "group=true count reduce", context do
-# args = %{
-# :reduce => true,
-# :group => true,
+ test "group=true count reduce", context do
+ args = %{
+ :reduce => true,
+ :group => true
# :limit => 9
-# }
-#
-# {:ok, res} = run_query(context, args)
-# IO.inspect(res, label: "OUT")
-#
-# assert res == [
-# {:row, [key: 1, value: 2]},
-# {:row, [key: 2, value: 2]},
-# {:row, [key: 3, value: 2]},
-# {:row, [key: [1, 1], value: 1]},
-# {:row, [key: [1, 2, 6], value: 1]},
-# {:row, [key: [2, 1], value: 1]},
-# {:row, [key: [2, 3, 6], value: 1]},
-# {:row, [key: [3, 1], value: 1]},
-# {:row, [key: [3, 4, 5], value: 1]}
-# ]
-# end
+ }
+
+ {:ok, res} = run_query(context, args)
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1, 1], value: 1]},
+ {:row, [key: [1, 2, 6], value: 1]},
+ {:row, [key: [2, 1], value: 1]},
+ {:row, [key: [2, 3, 6], value: 1]},
+ {:row, [key: [3, 1], value: 1]},
+ {:row, [key: [3, 4, 5], value: 1]}
+ ]
+ end
test "group=1 count reduce", context do
args = %{
@@ -83,6 +83,47 @@ defmodule CouchViewsReduceTest do
]
end
+ test "group=2 count reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 2
+# :limit => 9
+ }
+
+ {:ok, res} = run_query(context, args)
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1, 1], value: 1]},
+ {:row, [key: [1, 2], value: 1]},
+ {:row, [key: [2, 1], value: 1]},
+ {:row, [key: [2, 3], value: 1]},
+ {:row, [key: [3, 1], value: 1]},
+ {:row, [key: [3, 4], value: 1]}
+ ]
+ end
+
+ test "group=2 count reduce with limit = 3", context do
+ args = %{
+ :reduce => true,
+ :group_level => 2,
+ :limit => 4
+ }
+
+ {:ok, res} = run_query(context, args)
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1, 1], value: 1]}
+ ]
+ end
+
defp run_query(context, args) do
db = context[:db]
ddoc = context[:ddoc]