You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:03 UTC
[couchdb] 04/05: can do group_level query
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit bff44fb9f7cfc92e21012c705b8cd1a9938fac88
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 14:09:36 2019 +0200
can do group_level query
---
src/couch_views/src/couch_views_reader.erl | 9 +-
src/couch_views/src/couch_views_reduce.erl | 15 +-
src/couch_views/src/couch_views_reduce_fdb.erl | 175 +++++++++++++++------
.../test/exunit/couch_views_reduce_test.exs | 47 +++---
4 files changed, 168 insertions(+), 78 deletions(-)
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 394b3cf..e750a94 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -32,7 +32,8 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
} = Mrst,
ViewId = get_view_id(Lang, Args, ViewName, Views),
- couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+ Reducer = get_view_reducer(Lang, Args, ViewName, Views),
+ couch_views_reduce:read_reduce(Db, Sig, ViewId, Reducer, UserCallback,
UserAcc0, Args).
%% Fun = fun handle_reduce_row/3,
%%
@@ -243,6 +244,12 @@ get_view_id(Lang, Args, ViewName, Views) ->
{red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
+get_view_reducer(Lang, Args, ViewName, Views) ->
+ case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+ {map, View, _Args} -> throw(no_reduce);
+ View -> couch_mrview_util:extract_view_reduce(View)
+ end.
+
expand_keys_args(#mrargs{keys = undefined} = Args) ->
[Args];
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index ebd2f47..b7eb18e 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,7 +15,7 @@
-export([
run_reduce/2,
- read_reduce/6,
+ read_reduce/7,
setup_reduce_indexes/3
]).
@@ -30,7 +30,7 @@
-define(MAX_SKIP_LIST_LEVELS, 6).
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
#{
db_prefix := DbPrefix
} = Db,
@@ -38,9 +38,16 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
#mrargs{
- limit = Limit
+ limit = Limit,
+ group = Group,
+ group_level = GroupLevel
} = Args,
+ GroupLevel1 = case Group of
+ true -> group_true;
+ _ -> GroupLevel
+ end,
+
Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
try
@@ -59,7 +66,7 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
- Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 9683265..5759c42 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,7 +15,7 @@
-export([
- fold_level0/6,
+ fold_level0/8,
create_skip_list/3,
update_reduce_idx/6
]).
@@ -26,7 +26,7 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric2.hrl").
--define(MAX_SKIP_LIST_LEVELS, 6).
+-define(MAX_SKIP_LIST_LEVELS, 1).
-define(LEVEL_FAN_POW, 1).
log_levels(Db, Sig, ViewId) ->
@@ -34,34 +34,40 @@ log_levels(Db, Sig, ViewId) ->
db_prefix := DbPrefix
} = Db,
- Levels = lists:seq(0, 6),
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Opts = [{streaming_mode, want_all}],
fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- lists:foreach(fun (Level) ->
+ lists:foldl(fun (Level, Level0Total) ->
{StartKey, EndKey} = erlfdb_tuple:range({Level},
ReduceIdxPrefix),
- Acc0 = #{
- sig => Sig,
- view_id => ViewId,
- reduce_idx_prefix => ReduceIdxPrefix,
- user_acc => [],
- callback => fun handle_log_levels/3
- },
-
- Fun = fun fold_fwd_cb/2,
- Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
- #{
- user_acc := Rows
- } = Acc,
- io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
- end, Levels)
+ Future = erlfdb:get_range(Tx, StartKey, EndKey, Opts),
+ Rows = lists:map(fun ({_Key, EV}) ->
+ unpack_key_value(EV)
+ end, erlfdb:wait(Future)),
+
+ io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+ case Level == 0 of
+ true ->
+ sum_rows(Rows);
+ false ->
+ Total = sum_rows(Rows),
+ if Total == Level0Total -> Level0Total; true ->
+ io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
+%% throw(level_total_error)
+ end
+ end
+
+ end, 0, Levels)
end).
-handle_log_levels(Key, Value, Acc) ->
- Acc ++ [{Key, Value}].
+sum_rows(Rows) ->
+ lists:foldl(fun ({_, Val}, Sum) ->
+ Val + Sum
+ end, 0, Rows).
+
%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
%% #{
@@ -95,7 +101,7 @@ handle_log_levels(Key, Value, Acc) ->
%% {ok, UserAcc1}
%% end).
-fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
#{
db_prefix := DbPrefix
} = Db,
@@ -108,7 +114,10 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
user_acc => UserAcc0,
%% args := Args,
callback => UserCallback,
- reduce_idx_prefix => ReduceIdxPrefix
+ reduce_idx_prefix => ReduceIdxPrefix,
+ reducer => Reducer,
+ group_level => GroupLevel,
+ rows => []
},
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -118,36 +127,94 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
} = TxDb,
-%% {StartKey, EndKey} = erlfdb_tuple:range({0},
-%% ReduceIdxPrefix),
{startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
{endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
-%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Fun = fun fold_fwd_cb/2,
Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
#{
- user_acc := UserAcc1
+ user_acc := UserAcc1,
+ rows := Rows
} = Acc1,
- UserAcc1
+
+ rereduce_and_reply(Reducer, Rows, GroupLevel,
+ UserCallback, UserAcc1)
end).
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
#{
- reduce_idx_prefix := ReduceIdxPrefix,
callback := Callback,
- user_acc := UserAcc
+ user_acc := UserAcc,
+ group_level := GroupLevel,
+ rows := Rows,
+ reducer := Reducer
} = Acc,
- {_Level, _EK}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- {EK, EV1} = erlfdb_tuple:unpack(EV),
- Key = couch_views_encoding:decode(EK),
- Val = couch_views_encoding:decode(EV1),
+ {Key, Val} = unpack_key_value(EV),
+
+ LastKey = if Rows == [] -> false; true ->
+ {LastKey0, _} = lists:last(Rows),
+ LastKey0
+ end,
+
+ case group_level_equal(Key, LastKey, GroupLevel) of
+ true ->
+ Acc#{
+ rows := Rows ++ [{Key, Val}]
+ };
+ false ->
+ UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
+ Callback, UserAcc),
+ Acc#{
+ user_acc := UserAcc1,
+ rows := [{Key, Val}]
+ }
+ end.
+
+rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
+ Acc;
+
+rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
+ {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+ Callback(ReducedKey, ReducedVal, Acc).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+ no_kvs;
- UserAcc1 = Callback(Key, Val, UserAcc),
- Acc#{user_acc := UserAcc1}.
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+ {Key, Val} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+ Val = length(Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+ true;
+
+group_level_equal(_One, _Two, group_true) ->
+ false;
+
+group_level_equal(One, Two, GroupLevel) ->
+ GroupOne = group_level_key(One, GroupLevel),
+ GroupTwo = group_level_key(Two, GroupLevel),
+ GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+ null;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+ lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+ Key.
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
@@ -155,6 +222,13 @@ reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
erlfdb_tuple:pack(Key, DbPrefix).
+unpack_key_value(EncodedValue) ->
+ {EK, EV1} = erlfdb_tuple:unpack(EncodedValue),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
+ {Key, Val}.
+
+
%% Inserting
update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
#{
@@ -205,15 +279,15 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
- io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+ io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
- io:format("Adding ~p ~p ~n", [Level, Key]),
+ io:format("Adding at ~p ~p ~n", [Level, Key]),
add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
false ->
- {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
- io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
@@ -229,7 +303,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
EK = couch_views_encoding:encode(Key, key),
StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
- StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+ StartKeySel = erlfdb_key:last_less_than(StartKey),
EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
@@ -241,6 +315,9 @@ hash_key(Key) ->
erlang:phash2(Key).
+should_add_key_to_level(0, _KeyHash) ->
+ true;
+
should_add_key_to_level(Level, KeyHash) ->
(KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -277,11 +354,11 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
ok = erlfdb:set(Tx, LevelKey, EV).
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
- case PrevVal >= Val of
- true -> {PrevKey, PrevVal};
- false -> {PrevKey, Val}
- end.
+%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+%% case PrevVal >= Val of
+%% true -> {PrevKey, PrevVal};
+%% false -> {PrevKey, Val}
+%% end.
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index c1b35e2..488f3ee 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -60,16 +60,16 @@ defmodule CouchViewsReduceTest do
test "group_level=1 count reduce", context do
args = %{
:reduce => true,
- :group => true,
+ :group_level => 1,
}
{:ok, res} = run_query(context, args, "dates")
IO.inspect(res, label: "OUT")
assert res == [
- {:row, [key: [2017], value: 1]},
- {:row, [key: [2018], value: 1]},
- {:row, [key: [2019], value: 1]}
+ {:row, [key: [2017], value: 4]},
+ {:row, [key: [2018], value: 3]},
+ {:row, [key: [2019], value: 2]}
]
end
@@ -221,7 +221,7 @@ defmodule CouchViewsReduceTest do
[2019, 4, 1]
]
- for i <- 1..4 do
+ for i <- 1..10 do
group =
if rem(i, 3) == 0 do
"first"
@@ -235,7 +235,6 @@ defmodule CouchViewsReduceTest do
{"some", "field"},
{"group", group},
{"date", Enum.at(dates, i - 1)}
- # {"timestamp", Enum.at(timestamps, i - 1)}
]})
end
end
@@ -254,25 +253,25 @@ defmodule CouchViewsReduceTest do
}
"""},
{"reduce", "_count"}
- ]}},
- {"baz",
- {[
- {"map",
- """
- function(doc) {
- emit(doc.value, doc.value);
- emit(doc.value, doc.value);
- emit([doc.value, 1], doc.value);
- emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
- if (doc.value === 3) {
- emit([1, 1, 5], 1);
- emit([doc.value, 1, 5], 1);
- }
- }
- """},
- {"reduce", "_count"}
]}}
+# {"baz",
+# {[
+# {"map",
+# """
+# function(doc) {
+# emit(doc.value, doc.value);
+# emit(doc.value, doc.value);
+# emit([doc.value, 1], doc.value);
+# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+# if (doc.value === 3) {
+# emit([1, 1, 5], 1);
+# emit([doc.value, 1, 5], 1);
+# }
+# }
+# """},
+# {"reduce", "_count"}
+# ]}}
# {"boom",
# {[
# {"map",