You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/12/05 17:03:45 UTC
[couchdb] 07/08: basic skiplist query working
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f01b653e1f8fc0f8be8837e5217ad0f5b9e8fdad
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Dec 5 15:42:52 2019 +0200
basic skiplist query working
---
src/couch_views/src/couch_views_indexer.erl | 2 +-
src/couch_views/src/couch_views_reduce.erl | 117 +++----
src/couch_views/src/couch_views_reduce_fdb.erl | 355 ++++++++++++++++-----
src/couch_views/src/couch_views_reducer.erl | 119 +++++++
.../test/exunit/couch_views_reduce_test.exs | 49 ++-
5 files changed, 447 insertions(+), 195 deletions(-)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 51fae9b..096d838 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -311,7 +311,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
%% First build of the view
if ViewSeq /= <<>> -> ok; true ->
- couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+ couch_views_reduce_fdb:create_reduce_indexes(TxDb, Sig, ViewIds)
end,
lists:foreach(fun(Doc) ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index a2e3a93..0e837e3 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,7 @@
-export([
run_reduce/2,
- read_reduce/7,
- setup_reduce_indexes/3
+ read_reduce/7
]).
@@ -36,7 +35,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
} = Db,
%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ ReduceIdxPrefix = couch_views_reduce_fdb:reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
#mrargs{
limit = Limit,
group = Group,
@@ -48,7 +47,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
_ -> GroupLevel
end,
- Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+%% Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -66,7 +65,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
- Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+%% Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+
+ SkipListOpts = args_to_skiplist_opts(Args),
+ Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
@@ -76,6 +78,30 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
{ok, Out}
end.
+args_to_skiplist_opts(#mrargs{} = Args) ->
+ #mrargs{
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+
+ StartKey1 = case StartKey of
+ undefined ->
+ [0];
+ StartKey ->
+ StartKey
+ end,
+
+ EndKey1 = case EndKey of
+ undefined ->
+ throw(no_end_key_not_working_yet_error);
+ EndKey ->
+ EndKey
+ end,
+ #{
+ startkey => StartKey1,
+ endkey => EndKey1
+ }.
+
args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
#mrargs{
@@ -85,10 +111,10 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
StartKey1 = case StartKey of
undefined ->
- StartKey0 = encode_key(0, 0, ReduceIdxPrefix),
+ StartKey0 = couch_views_reduce_fdb:create_key(0, 0, ReduceIdxPrefix),
erlfdb_key:first_greater_than(StartKey0);
StartKey ->
- StartKey0 = encode_key(StartKey, 0, ReduceIdxPrefix),
+ StartKey0 = couch_views_reduce_fdb:create_key(StartKey, 0, ReduceIdxPrefix),
erlfdb_key:first_greater_or_equal(StartKey0)
end,
@@ -99,17 +125,12 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
EndKey0;
EndKey ->
io:format("ENDKEY ~n"),
- EndKey0 = encode_key(EndKey, 0, ReduceIdxPrefix),
+ EndKey0 = couch_views_reduce_fdb:create_key(EndKey, 0, ReduceIdxPrefix),
erlfdb_key:first_greater_than(EndKey0)
end,
[{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
-encode_key(Key, Level, ReduceIdxPrefix) ->
- EK = {Level, couch_views_encoding:encode(Key, key)},
- erlfdb_tuple:pack(EK, ReduceIdxPrefix).
-
-
handle_row(Key, Value, Acc) ->
#{
callback := UserCallback,
@@ -118,6 +139,7 @@ handle_row(Key, Value, Acc) ->
limit := Limit
} = Acc,
+ io:format("WOO ROW ~p ~p ~n", [Key, Value]),
Row = [
{key, Key},
{value, Value}
@@ -140,23 +162,6 @@ handle_row(Key, Value, Acc) ->
maybe_stop({ok, Acc}) -> Acc;
maybe_stop({stop, Acc}) -> throw({done, Acc}).
-setup_reduce_indexes(Db, Sig, ViewIds) ->
- #{
- db_prefix := DbPrefix
- } = Db,
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foreach(fun (ViewId) ->
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId
- },
- couch_views_reduce_fdb:create_skip_list(TxDb,
- ?MAX_SKIP_LIST_LEVELS, ViewOpts)
- end, ViewIds)
- end).
-
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
ReduceFuns = lists:map(fun(View) ->
@@ -175,7 +180,7 @@ run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
} = MappedResult,
ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
- reduce(ReduceFun, Result)
+ couch_views_reducer:reduce(ReduceFun, Result)
end, lists:zip(ReduceFuns, Results)),
MappedResult#{
@@ -184,59 +189,9 @@ run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
end, MappedResults).
-reduce(<<"_count">>, Results) ->
- ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
- case maps:is_key(Key, Acc) of
- true ->
- #{Key := Val} = Acc,
- Acc#{Key := Val + 1};
- false ->
- Acc#{Key => 1}
- end
- end, #{}, Results),
- maps:to_list(ReduceResults);
-
-reduce(<<"_sum">>, Results) ->
- ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
- case maps:is_key(Key, Acc) of
- true ->
- #{Key := Sum} = Acc,
- Acc#{Key := Val + Sum};
- false ->
- Acc#{Key => Val}
- end
- end, #{}, Results),
- maps:to_list(ReduceResults);
-
-% this isn't a real supported reduce function in CouchDB
-% But I want a basic reduce function that when we need to update the index
-% we would need to re-read multiple rows instead of being able to do an
-% atomic update
-reduce(<<"_stats">>, Results) ->
- ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
- io:format("MAX ~p ~p ~n", [Key, Val]),
- case maps:is_key(Key, Acc) of
- true ->
- #{Key := Max} = Acc,
- case Max >= Val of
- true ->
- Acc;
- false ->
- Acc#{Key := Val}
- end;
- false ->
- Acc#{Key => Val}
- end
- end, #{}, Results),
- maps:to_list(ReduceResults).
-
-
is_builtin(<<"_", _/binary>>) ->
true;
is_builtin(_) ->
false.
-reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
- Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
- erlfdb_tuple:pack(Key, DbPrefix).
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 77514ed..72e05b8 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,9 +15,13 @@
-export([
+ fold_skip_list/8,
fold_level0/8,
+ create_reduce_indexes/3,
create_skip_list/3,
- update_reduce_idx/7
+ update_reduce_idx/7,
+ reduce_skip_list_idx_prefix/3,
+ create_key/3
]).
@@ -26,9 +30,11 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric2.hrl").
--define(MAX_SKIP_LIST_LEVELS, 1).
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
-define(LEVEL_FAN_POW, 1).
+
log_levels(Db, Sig, ViewId) ->
#{
db_prefix := DbPrefix
@@ -55,8 +61,8 @@ log_levels(Db, Sig, ViewId) ->
false ->
Total = sum_rows(Rows),
if Total == Level0Total -> Level0Total; true ->
- io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
-%% throw(level_total_error)
+ io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total]),
+ throw(level_total_error)
end
end
@@ -69,18 +75,234 @@ sum_rows(Rows) ->
end, 0, Rows).
+fold_skip_list(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+%% timer:exit_after(40, boom),
+
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Acc = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ reducer => Reducer,
+ group_level => GroupLevel,
+ rows => []
+ },
+ #{
+ startkey := StartKey,
+ endkey := EndKey
+ } = Opts,
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+ Acc1 = traverse_skip_list(TxDb, 0, StartKey, EndKey, Acc),
+ #{
+ user_acc := UserAcc1,
+ rows := Rows1
+ } = Acc1,
+ rereduce_and_reply(Reducer, Rows1, GroupLevel, UserCallback, UserAcc1)
+ end).
+
+traverse_skip_list(_TxDb, Level, _CurrentKey, _EndKey, _Acc) when Level < 0 ->
+ throw(skip_list_gone_to_low);
+
+traverse_skip_list(TxDb, _Level, CurrentKey, EndKey, Acc) ->
+ #{
+ user_acc := UserAcc,
+ callback := UserCallback,
+ reduce_idx_prefix := ReduceIdxPrefix,
+ reducer := Reducer,
+ group_level := GroupLevel,
+ rows := Rows
+ } = Acc,
+
+ {RangeLevel, RangeStart, RangeEnd} = get_next_range_and_level(TxDb,
+ ReduceIdxPrefix, GroupLevel, CurrentKey, EndKey),
+ io:format("TRAVERSE Level ~p RangeStart ~p RangeEnd ~p ~n", [RangeLevel, RangeStart, RangeEnd]),
+ Results = get_range_inclusive(TxDb, RangeStart, RangeEnd, RangeLevel, ReduceIdxPrefix),
+ io:format("RESULTS ~p ~n", [Results]),
+
+ NextStart = if Results == [] -> null; true ->
+ {LastKey, _} = lists:last(Results),
+ LastKey
+ end,
+ KeyAfterStart = get_key_after(TxDb, NextStart, EndKey, RangeLevel, ReduceIdxPrefix),
+ io:format("NEXTStart ~p KeyAfter ~p RangeLevel ~p ~n", [NextStart, KeyAfterStart, RangeLevel]),
+
+ {NextStart1, Acc1} = case couch_views_reducer:group_level_equal(NextStart, KeyAfterStart, GroupLevel) of
+ true ->
+ AccNext = Acc#{rows := Rows ++ Results},
+ {NextStart, AccNext};
+ false when RangeLevel == 0 ->
+ AllResults = Rows ++ Results,
+ UserAcc1 = rereduce_and_reply(Reducer, AllResults, GroupLevel,
+ UserCallback, UserAcc),
+ AccNext = Acc#{
+ user_acc := UserAcc1,
+ rows := []
+ },
+ {KeyAfterStart, AccNext};
+ % Need to traverse at level 0 to make sure we have all keys for
+ % the current group_level keys
+ false ->
+ UsableResults = lists:sublist(Results, length(Results) - 1),
+ io:format("USABLE ~p LEVEL ~p ~n", [UsableResults, RangeLevel]),
+ AccNext = Acc#{rows := Rows ++ UsableResults},
+ {NextStart, AccNext}
+ end,
+
+ case RangeEnd == EndKey orelse NextStart1 == null of
+ true when RangeLevel == 0 ->
+ io:format("FINISED ~n"),
+ Acc1;
+ _ ->
+ traverse_skip_list(TxDb, 0, NextStart1, EndKey, Acc1)
+ end.
+
+
+
+get_next_range_and_level(TxDb, ReduceIdxPrefix, GroupLevel, StartKey, EndKey) ->
+ GroupEndKey = get_group_level_endkey(TxDb, GroupLevel, 0, StartKey, ReduceIdxPrefix),
+ % Do not exceed the set endkey
+ GroupEndKey1 = if GroupEndKey < EndKey -> GroupEndKey; true -> EndKey end,
+
+ LevelRanges = [{0, StartKey, GroupEndKey1}],
+ io:format("Get Range StartKey ~p GroupEndKey ~p EndKey ~p ~n", [StartKey, GroupEndKey1, EndKey]),
+ LevelRanges1 = scan_for_level_ranges(TxDb, 0, GroupLevel, StartKey, GroupEndKey1, ReduceIdxPrefix, LevelRanges),
+ lists:last(LevelRanges1).
+
+
+% at end of this specific grouplevel, so have to do final scan at level 0
+scan_for_level_ranges(_TxDb, _Level, _GroupLevel, StartKey, StartKey, _ReduceIdxPrefix, _Acc) ->
+ [{0, StartKey, StartKey}];
+
+scan_for_level_ranges(_TxDb, ?MAX_SKIP_LIST_LEVELS, _GroupLevel, StartKey, StartKey, _ReduceIdxPrefix, Acc) ->
+ Acc;
+
+scan_for_level_ranges(TxDb, Level, GroupLevel, StartKey, EndKey, ReduceIdxPrefix, Acc) ->
+ NextLevel = Level + 1,
+ NearestKey = get_key_or_nearest(TxDb, NextLevel, StartKey, EndKey, ReduceIdxPrefix),
+ io:format("SCAN startkey ~p nearest ~p ~n", [StartKey, NearestKey]),
+ case StartKey =:= NearestKey of
+ true ->
+ GroupLevelEndKey = get_group_level_endkey(TxDb, GroupLevel,
+ NextLevel, StartKey, ReduceIdxPrefix),
+
+ ToFar = GroupLevelEndKey > EndKey,
+ EndOfLevel = GroupLevelEndKey == NearestKey,
+
+ case ToFar orelse EndOfLevel of
+ true ->
+ Acc;
+ false ->
+ Acc1 = Acc ++ [{NextLevel, StartKey, GroupLevelEndKey}],
+ scan_for_level_ranges(TxDb, NextLevel, GroupLevel, StartKey,
+ EndKey, ReduceIdxPrefix, Acc1)
+ end;
+ false ->
+ case couch_views_reducer:group_level_equal(StartKey, NearestKey,
+ GroupLevel) of
+ true ->
+ [{Level, StartKey, NearestKey}];
+ false ->
+ Acc
+ end
+ end.
+
+
+get_key_or_nearest(TxDb, Level, StartKey, EndKey, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+ StartKey2 = erlfdb_key:first_greater_or_equal(StartKey1),
+
+ EndKey1 = create_endkey(ReduceIdxPrefix, Level, EndKey),
+ EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+ Future = erlfdb:get_range(Tx, StartKey2, EndKey2, [{limit, 1}]),
+ wait_and_get_key(Future).
+
+
+get_group_level_endkey(TxDb, GroupLevel, Level, StartKey, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ GroupLevelKey = couch_views_reducer:group_level_key(StartKey, GroupLevel),
+ StartKey1 = create_key(ReduceIdxPrefix, Level, GroupLevelKey),
+ StartKey2 = erlfdb_key:first_greater_than(StartKey1),
+ EndKey = create_endkey(ReduceIdxPrefix, Level, GroupLevelKey),
+ EndKey1 = erlfdb_key:first_greater_or_equal(EndKey),
+ Future = erlfdb:get_range(Tx, StartKey2, EndKey1, [{reverse, true}, {limit, 1}]),
+ wait_and_get_key(Future).
+
+
+get_key_after(TxDb, StartKey, EndKey, Level, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+ StartKey2 = erlfdb_key:first_greater_than(StartKey1),
+
+ EndKey1 = create_endkey(ReduceIdxPrefix, Level, EndKey),
+ EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+ Future = erlfdb:get_range(Tx, StartKey2, EndKey2, [{limit, 1}]),
+ wait_and_get_key(Future).
+
+
+wait_and_get_key(Future) ->
+ case erlfdb:wait(Future) of
+ [] ->
+ null;
+ [{_FullEncodedKey, PackedValue}] ->
+ {Key, _} = get_key_value(PackedValue),
+ Key
+ end.
+
+
+get_range_inclusive(TxDb, StartKey, EndKey, Level, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+ StartKey2 = erlfdb_key:first_greater_or_equal(StartKey1),
+
+ EndKey1 = create_key(ReduceIdxPrefix, Level, EndKey),
+ EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+ Fun = fun ({_FullEncodedKey, PackedValue}, Acc0) ->
+ KV = get_key_value(PackedValue),
+ Acc0 ++ [KV]
+ end,
+
+ erlfdb:fold_range(Tx, StartKey2, EndKey2, Fun, [], []).
+
+
+% TODO: This needs a better name
+create_endkey(ReduceIdxPrefix, Level, Key) ->
+ Key1 = if Key /= null -> Key; true -> [] end,
+ EK = couch_views_encoding:encode(Key1 ++ [16#FF], key),
+ LevelKey = {Level, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
#{
db_prefix := DbPrefix
} = Db,
- Level = 0,
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Acc = #{
sig => Sig,
view_id => ViewId,
user_acc => UserAcc0,
- %% args := Args,
callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
reducer => Reducer,
@@ -88,16 +310,15 @@ fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0)
rows => []
},
+ {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+ {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
fabric2_fdb:transactional(Db, fun(TxDb) ->
log_levels(TxDb, Sig, ViewId),
#{
tx := Tx
} = TxDb,
-
- {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
- {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
-
Fun = fun fold_fwd_cb/2,
Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
#{
@@ -126,10 +347,10 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
LastKey0
end,
- GroupLevelKey = group_level_key(Key, GroupLevel),
+ GroupLevelKey = couch_views_reducer:group_level_key(Key, GroupLevel),
GroupKV = [{GroupLevelKey, Val}],
- case group_level_equal(Key, LastKey, GroupLevel) of
+ case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
true ->
Acc#{
rows := Rows ++ GroupKV
@@ -147,58 +368,10 @@ rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
Acc;
rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
- {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+ {ReducedKey, ReducedVal} = couch_views_reducer:rereduce(Reducer, Rows, GroupLevel),
Callback(ReducedKey, ReducedVal, Acc).
-rereduce(_Reducer, [], _GroupLevel) ->
- no_kvs;
-
-rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
- {Key, Val} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Val};
-
-rereduce(<<"_sum">>, Rows, GroupLevel) ->
- Sum = lists:foldl(fun ({_, Val}, Acc) ->
- Val + Acc
- end, 0, Rows),
- {Key, _} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Sum};
-
-rereduce(<<"_count">>, Rows, GroupLevel) ->
- Val = length(Rows),
- {Key, _} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Val}.
-
-
-group_level_equal(_One, _Two, 0) ->
- true;
-
-group_level_equal(_One, _Two, group_true) ->
- false;
-
-group_level_equal(One, Two, GroupLevel) ->
- GroupOne = group_level_key(One, GroupLevel),
- GroupTwo = group_level_key(Two, GroupLevel),
- GroupOne == GroupTwo.
-
-
-group_level_key(_Key, 0) ->
- null;
-
-group_level_key(Key, group_true) ->
- Key;
-
-group_level_key(Key, GroupLevel) when is_list(Key) ->
- lists:sublist(Key, GroupLevel);
-
-group_level_key(Key, _GroupLevel) ->
- Key.
-
-
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
@@ -211,23 +384,21 @@ unpack_key_value(EncodedValue) ->
{Key, Val}.
-%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
+create_reduce_indexes(Db, Sig, ViewIds) ->
#{
db_prefix := DbPrefix
- } = TxDb,
-
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId,
- reducer => Reducer
- },
+ } = Db,
- lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
- add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
- end, ReduceResult).
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun (ViewId) ->
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+ end, ViewIds)
+ end).
create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
@@ -248,16 +419,33 @@ create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
end).
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId,
+ reducer => Reducer
+ },
+
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ReduceIdxPrefix, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+add_kv_to_skip_list(Db, ReduceIdxPrefix, MaxLevel, #{} = ViewOpts, Key, Val) ->
#{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId,
reducer := Reducer
} = ViewOpts,
Levels = lists:seq(1, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
KeyHash = hash_key(Key),
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -265,7 +453,7 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
not_found ->
Val;
ExistingVal ->
- {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+ {_, ReducedVal} = couch_views_reducer:rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
ReducedVal
end,
io:format("VAL1 ~p ~n", [Val1]),
@@ -279,9 +467,10 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
io:format("Adding at ~p ~p ~n", [Level, Key]),
add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
false ->
+ {_, NewVal} = couch_views_reducer:rereduce(Reducer, [{PrevKey, PrevVal}, {Key, Val}], 0),
%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, NewVal)
end
end, Levels)
end).
diff --git a/src/couch_views/src/couch_views_reducer.erl b/src/couch_views/src/couch_views_reducer.erl
new file mode 100644
index 0000000..a7ac783
--- /dev/null
+++ b/src/couch_views/src/couch_views_reducer.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_views_reducer).
+
+
+-export([
+ reduce/2,
+ rereduce/3,
+ group_level_equal/3,
+ group_level_key/2
+]).
+
+
+reduce(<<"_count">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Val} = Acc,
+ Acc#{Key := Val + 1};
+ false ->
+ Acc#{Key => 1}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+reduce(<<"_sum">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Sum} = Acc,
+ Acc#{Key := Val + Sum};
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ io:format("MAX ~p ~p ~n", [Key, Val]),
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Max} = Acc,
+ case Max >= Val of
+ true ->
+ Acc;
+ false ->
+ Acc#{Key := Val}
+ end;
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+ no_kvs;
+
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+ {Key, Val} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val};
+
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+ Sum = lists:foldl(fun ({_, Val}, Acc) ->
+ Val + Acc
+ end, 0, Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Sum};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+ Val = length(Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+ true;
+
+group_level_equal(_One, _Two, group_true) ->
+ false;
+
+group_level_equal(One, Two, GroupLevel) ->
+ GroupOne = group_level_key(One, GroupLevel),
+ GroupTwo = group_level_key(Two, GroupLevel),
+ GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+ null;
+
+group_level_key(Key, group_true) ->
+ Key;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+ lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+ Key.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index d6dcc60..e2b9d3b 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -46,7 +46,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "dates")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: [2017, 3, 1], value: 1]},
@@ -62,7 +61,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "dates_count")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: [2017], value: 4]},
@@ -74,11 +72,11 @@ defmodule CouchViewsReduceTest do
test "group_level=1 reduce", context do
args = %{
reduce: true,
- group_level: 1
+ group_level: 1,
+ end_key: [2019,5,1]
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 31]},
@@ -97,7 +95,6 @@ defmodule CouchViewsReduceTest do
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 22]},
@@ -114,7 +111,6 @@ defmodule CouchViewsReduceTest do
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 22]},
@@ -132,7 +128,6 @@ defmodule CouchViewsReduceTest do
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 22]},
@@ -141,23 +136,22 @@ defmodule CouchViewsReduceTest do
]
end
- test "group=true reduce with startkey/endkey", context do
- args = %{
- reduce: true,
- group: true,
- start_key: [2018, 5, 1],
- end_key: [2019, 04, 1],
- }
-
- {:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
-
- assert res == [
- {:row, [key: [2018, 5, 1], value: 7]},
- {:row, [key: [2019, 3, 1], value: 4]},
- {:row, [key: [2019, 4, 1], value: 6]}
- ]
- end
+# test "group=true reduce with startkey/endkey", context do
+# args = %{
+# reduce: true,
+# group: true,
+# start_key: [2018, 5, 1],
+# end_key: [2019, 04, 1],
+# }
+#
+# {:ok, res} = run_query(context, args, "dates_sum")
+#
+# assert res == [
+# {:row, [key: [2018, 5, 1], value: 7]},
+# {:row, [key: [2019, 3, 1], value: 4]},
+# {:row, [key: [2019, 4, 1], value: 6]}
+# ]
+# end
# test "group=1 count reduce", context do
# args = %{
@@ -167,7 +161,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: 1, value: 2]},
@@ -187,7 +180,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: 1, value: 2]},
@@ -210,7 +202,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: 1, value: 2]},
@@ -236,7 +227,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "boom")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: [2019, 1], value: 1]},
@@ -252,7 +242,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "max")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: :null, value: 3]}
@@ -267,7 +256,7 @@ defmodule CouchViewsReduceTest do
end
def default_cb(:complete, acc) do
- IO.inspect(acc, label: "complete")
+ IO.inspect(Enum.reverse(acc), label: "complete")
{:ok, Enum.reverse(acc)}
end