You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:02 UTC
[couchdb] 03/05: progress with reading level 0
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 533fdb660fe42726af5d5a948cbc386234d16341
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Oct 30 11:37:36 2019 +0200
progress with reading level 0
---
src/couch_views/src/couch_views_fdb.erl | 4 +-
src/couch_views/src/couch_views_indexer.erl | 8 +-
src/couch_views/src/couch_views_reader.erl | 30 --
src/couch_views/src/couch_views_reduce.erl | 306 +++++++--------------
src/couch_views/src/couch_views_reduce_fdb.erl | 274 +++++++++++++++++-
.../test/exunit/couch_views_reduce_test.exs | 94 +++++--
6 files changed, 428 insertions(+), 288 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 479a707..8999d76 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -133,8 +133,6 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
-
-
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -181,7 +179,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
ExistingKeys, ReduceResult)
end, lists:zip3(ViewIds, Results, ReduceResults)).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index d4a78b8..3c60743 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -221,11 +221,17 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = Mrst,
#{
- last_seq := LastSeq
+ last_seq := LastSeq,
+ view_seq := ViewSeq
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ %% First build of the view
+ if ViewSeq /= <<>> -> ok; true ->
+ couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+ end,
+
lists:foreach(fun(Doc) ->
couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
end, Docs),
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index d08515c..394b3cf 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -236,36 +236,6 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
-handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
- Acc#{skip := Skip - 1};
-
-handle_reduce_row(Key, Value, Acc) ->
- io:format("ACC ~p ~n", [Acc]),
- #{
- callback := UserCallback,
- acc := UserAcc0,
- row_count := RowCount,
- limit := Limit
- } = Acc,
-
- Row = [
- {key, Key},
- {value, Value}
- ],
-
- RowCountNext = RowCount + 1,
-
- UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
- Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
-
- case RowCountNext == Limit of
- true ->
- UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
- maybe_stop({stop, UserAcc2});
- false ->
- Acc1
- end.
-
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 4cb7416..ebd2f47 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,8 @@
-export([
run_reduce/2,
- update_reduce_idx/6,
- read_reduce/6
+ read_reduce/6,
+ setup_reduce_indexes/3
]).
@@ -30,99 +30,120 @@
-define(MAX_SKIP_LIST_LEVELS, 6).
-log_levels(Db, Sig, ViewId) ->
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
#{
db_prefix := DbPrefix
} = Db,
- Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
- Opts = [{streaming_mode, want_all}],
+ #mrargs{
+ limit = Limit
+ } = Args,
- fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- lists:foreach(fun (Level) ->
- {StartKey, EndKey} = erlfdb_tuple:range({Level},
- ReduceIdxPrefix),
+ Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ %% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
Acc0 = #{
sig => Sig,
view_id => ViewId,
+ user_acc => UserAcc0,
+ args => Args,
+ callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
- next => key,
- key => undefined,
- rows => []
+ limit => Limit,
+ row_count => 0
},
- Fun = fun fold_fwd_cb/2,
- Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ Fun = fun handle_row/3,
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
#{
- rows := Rows
- } = Acc,
- io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
- {ok, Rows}
- end, Levels),
- {ok, []}
- end).
-
+ user_acc := UserAcc1
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc1))}
+ end)
+ catch throw:{done, Out} ->
+ {ok, Out}
+ end.
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
- #{
- db_prefix := DbPrefix
- } = Db,
- Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
- fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- log_levels(TxDb, Sig, ViewId),
-%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-
-
- Acc0 = #{
- sig => Sig,
- view_id => ViewId,
- user_acc => UserAcc0,
- args => Args,
- callback => UserCallback,
- reduce_idx_prefix => ReduceIdxPrefix,
- rows => []
- },
-
-
-%% Opts = [{limit, 2}, {streaming_mode, want_all}],
-%% EK = couch_views_encoding:encode(0, key),
-%% {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
-%% ReduceIdxPrefix),
-%%
-%% Fun = fun fold_fwd_cb/2,
-%% Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
- #{
- rows := Rows
- } = Acc0,
- {ok, Rows}
- end).
-
-args_to_fdb_opts(#mrargs{} = Args) ->
+args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
#mrargs{
- limit = Limit,
- start_key = StartKey,
- end_key = EndKey
+%% limit = Limit,
+%% start_key = StartKey,
+%% end_key = EndKey,
+ group = Group,
+ group_level = GroupLevel
} = Args,
- ok.
+ {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
+ ReduceIdxPrefix),
+
+ StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
+
+%% StartKey1 = case StartKey of
+%% undefined -> erlfdb_key:first_greater_than(StartKey0);
+%% StartKey -> create_key(StartKey, 0, Red)
+%% end,
+
+ StartKey1 = erlfdb_key:first_greater_than(StartKey0),
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+ [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+
+
+encode_key(Key, Level) ->
+ {Level, couch_views_encoding:encode(Key, key)}.
+
+
+handle_row(Key, Value, Acc) ->
#{
- reduce_idx_prefix := ReduceIdxPrefix,
- rows := Rows
+ callback := UserCallback,
+ user_acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
} = Acc,
- {_Level, _EK}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- {EK, EV1} = erlfdb_tuple:unpack(EV),
- Key = couch_views_encoding:decode(EK),
- Val = couch_views_encoding:decode(EV1),
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
- Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
+ RowCountNext = RowCount + 1,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc1 = Acc#{user_acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+setup_reduce_indexes(Db, Sig, ViewIds) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun (ViewId) ->
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ couch_views_reduce_fdb:create_skip_list(TxDb,
+ ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+ end, ViewIds)
+ end).
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -192,149 +213,6 @@ is_builtin(<<"_", _/binary>>) ->
is_builtin(_) ->
false.
-
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
- #{
- db_prefix := DbPrefix
- } = TxDb,
-
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId
- },
- create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
-
- lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val])
-%% add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
- end, ReduceResult).
-
-
-create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
- #{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId
- } = ViewOpts,
-
- Levels = lists:seq(0, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
-
- lists:foreach(fun(Level) ->
- add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
- end, Levels)
- end).
-
-
-should_add_key_to_level(Level, Key) ->
- (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
-
-
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
- #{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId
- } = ViewOpts,
-
- Levels = lists:seq(0, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foldl(fun(Level) ->
- io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
- {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
- io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
- case should_add_key_to_level(Level, Key) of
- true ->
- io:format("Adding ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
- false ->
- {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
- io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
- end
- end, true, Levels)
- end).
-
-
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
- case PrevVal >= Val of
- true -> {PrevKey, PrevVal};
- false -> {PrevKey, Val}
- end.
-
-
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
-
-
-create_key(ReduceIdxPrefix, SkipLevel, Key) ->
- EK = couch_views_encoding:encode(Key, key),
- LevelKey = {SkipLevel, EK},
- erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
-
-
-create_value(Key, Val) ->
- EK = couch_views_encoding:encode(Key),
- EV = couch_views_encoding:encode(Val),
- erlfdb_tuple:pack({EK, EV}).
-
-
-add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
- #{
- tx := Tx
- } = TxDb,
-
- LevelKey = create_key(ReduceIdxPrefix, Level, Key),
- EV = create_value(Key, Val),
-
- ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
- #{
- tx := Tx
- } = TxDb,
-
- % TODO: see if we need to add in conflict ranges for this for level=0
- Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
-%% LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
-
- EK = couch_views_encoding:encode(Key, key),
- EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-
- {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
-%% EndKey1 = erlfdb_key:first_greater_than(EndKey0),
-
- Callback = fun row_cb/2,
- Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
- io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
- Out.
-
-
-row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
- io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
- {_Level, EK, _VIEW_ROW_VALUE}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- Val = couch_views_encoding:decode(EV),
-%% io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
-
- {key, {EK, ReduceIdxPrefix, Val}};
-
-row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
- io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
- {_Level, EK, ?VIEW_ROW_KEY}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- Key = couch_views_encoding:decode(EVK),
-
- {Key, Val}.
-
-
-
-
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index bcaaa30..9683265 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,19 +15,273 @@
-export([
-%% write_doc/4
+ fold_level0/6,
+ create_skip_list/3,
+ update_reduce_idx/6
]).
-% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
-%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
+-define(LEVEL_FAN_POW, 1).
+
+log_levels(Db, Sig, ViewId) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, 6),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Opts = [{streaming_mode, want_all}],
+
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ lists:foreach(fun (Level) ->
+ {StartKey, EndKey} = erlfdb_tuple:range({Level},
+ ReduceIdxPrefix),
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ user_acc => [],
+ callback => fun handle_log_levels/3
+ },
+
+ Fun = fun fold_fwd_cb/2,
+ Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ user_acc := Rows
+ } = Acc,
+ io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
+ end, Levels)
+ end).
+
+handle_log_levels(Key, Value, Acc) ->
+ Acc ++ [{Key, Value}].
+
+%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
%% #{
-%% id := DocId,
-%% reduce_results := ReduceResults
-%% } = Doc,
-%% lists:foreach(fun({ViewId, NewRows}) ->
-%% % update reduce index
-%% ok
-%% end, lists:zip(ViewIds, ReduceResults)).
+%% db_prefix := DbPrefix
+%% } = Db,
+%%
+%%%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+%% #mrargs{
+%% limit = Limit
+%% } = Args,
+%%
+%% fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%
+%% Acc0 = #{
+%% sig => Sig,
+%% view_id => ViewId,
+%% user_acc => UserAcc0,
+%% args => Args,
+%% callback => UserCallback,
+%% reduce_idx_prefix => ReduceIdxPrefix,
+%% limit => Limit,
+%% row_count => 0
+%%
+%% },
+%%
+%% Acc1 = read_level0_only(TxDb, Acc0, Callback),
+%% #{
+%% user_acc := UserAcc1
+%% } = Acc1,
+%% {ok, UserAcc1}
+%% end).
+
+fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Level = 0,
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Acc = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ %% args := Args,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix
+ },
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+ #{
+ tx := Tx
+ } = TxDb,
+
+
+%% {StartKey, EndKey} = erlfdb_tuple:range({0},
+%% ReduceIdxPrefix),
+ {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+ {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
+%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Fun = fun fold_fwd_cb/2,
+ Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
+ #{
+ user_acc := UserAcc1
+ } = Acc1,
+ UserAcc1
+ end).
+
+
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix,
+ callback := Callback,
+ user_acc := UserAcc
+ } = Acc,
+
+ {_Level, _EK}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ {EK, EV1} = erlfdb_tuple:unpack(EV),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
+
+ UserAcc1 = Callback(Key, Val, UserAcc),
+ Acc#{user_acc := UserAcc1}.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+ lists:foreach(fun(Level) ->
+ add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+ end, Levels)
+ end).
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ KeyHash = hash_key(Key),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun(Level) ->
+ {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+ io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+ case should_add_key_to_level(Level, KeyHash) of
+ true ->
+ io:format("Adding ~p ~p ~n", [Level, Key]),
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ false ->
+ {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+ end
+ end, Levels)
+ end).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ % TODO: see if we need to add in conflict ranges for this for level=0
+ Opts = [{limit, 1}, {streaming_mode, want_all}],
+
+ EK = couch_views_encoding:encode(Key, key),
+ StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+ StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+ EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+
+ Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
+ [{_FullEncodedKey, PackedValue}] = erlfdb:wait(Future),
+ get_key_value(PackedValue).
+
+
+hash_key(Key) ->
+ erlang:phash2(Key).
+
+
+should_add_key_to_level(Level, KeyHash) ->
+ (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
+%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+ EK = couch_views_encoding:encode(Key, key),
+ LevelKey = {SkipLevel, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+ EK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+ erlfdb_tuple:pack({EK, EV}).
+
+
+get_key_value(PackedValue) ->
+ {EncodedKey, EncodedValue}
+ = erlfdb_tuple:unpack(PackedValue),
+ Key = couch_views_encoding:decode(EncodedKey),
+ Value = couch_views_encoding:decode(EncodedValue),
+ {Key, Value}.
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+ EV = create_value(Key, Val),
+
+ ok = erlfdb:set(Tx, LevelKey, EV).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+ case PrevVal >= Val of
+ true -> {PrevKey, PrevVal};
+ false -> {PrevKey, Val}
+ end.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index a526658..c1b35e2 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,29 +40,37 @@ defmodule CouchViewsReduceTest do
}
end
- test "group=true count reduce", context do
- args = %{
- :reduce => true,
- :group => true
- # :limit => 9
- }
+# test "group=true count reduce with limit", context do
+# args = %{
+# :reduce => true,
+# :group => true,
+# :limit => 3
+# }
+#
+# {:ok, res} = run_query(context, args, "dates")
+# IO.inspect(res, label: "OUT")
+#
+# assert res == [
+# {:row, [key: [2017, 3, 1], value: 1]},
+# {:row, [key: [2017, 4, 1], value: 1]},
+# {:row, [key: [2017, 4, 15], value: 1]}
+# ]
+# end
+
+ test "group_level=1 count reduce", context do
+ args = %{
+ :reduce => true,
+ :group => true,
+ }
- {:ok, res} = run_query(context, args, "baz")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: 1, value: 2]},
- {:row, [key: 2, value: 2]},
- {:row, [key: 3, value: 2]},
- {:row, [key: [1, 1], value: 1]},
- {:row, [key: [1, 1, 5], value: 1]},
- {:row, [key: [1, 2, 6], value: 1]},
- {:row, [key: [2, 1], value: 1]},
- {:row, [key: [2, 3, 6], value: 1]},
- {:row, [key: [3, 1], value: 1]},
- {:row, [key: [3, 1, 5], value: 1]},
- {:row, [key: [3, 4, 5], value: 1]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 1]},
+ {:row, [key: [2018], value: 1]},
+ {:row, [key: [2019], value: 1]}
+ ]
end
# test "group=1 count reduce", context do
@@ -173,6 +181,7 @@ defmodule CouchViewsReduceTest do
end
def default_cb(:complete, acc) do
+ IO.inspect(acc, label: "complete")
{:ok, Enum.reverse(acc)}
end
@@ -197,7 +206,22 @@ defmodule CouchViewsReduceTest do
end
defp create_docs() do
- for i <- 1..1 do
+ dates = [
+ [2017, 3, 1],
+ [2017, 4, 1],
+ # out of order check
+ [2019, 3, 1],
+ [2017, 4, 15],
+ [2018, 4, 1],
+ [2017, 5, 1],
+ [2018, 3, 1],
+ # duplicate check
+ [2018, 4, 1],
+ [2018, 5, 1],
+ [2019, 4, 1]
+ ]
+
+ for i <- 1..4 do
group =
if rem(i, 3) == 0 do
"first"
@@ -205,14 +229,14 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj(
- {[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group}
- ]}
- )
+ :couch_doc.from_json_obj({[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", Enum.at(dates, i - 1)}
+ # {"timestamp", Enum.at(timestamps, i - 1)}
+ ]})
end
end
@@ -221,6 +245,16 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
+ {"dates",
+ {[
+ {"map",
+ """
+ function(doc) {
+ emit(doc.date, doc.value);
+ }
+ """},
+ {"reduce", "_count"}
+ ]}},
{"baz",
{[
{"map",