You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:42:59 UTC
[couchdb] branch prototype/builtin-reduce updated (c094548 ->
7ca5202)
This is an automated email from the ASF dual-hosted git repository.
garren pushed a change to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
discard c094548 basic skip list setup
discard 744625c return group over exact for set group_level
discard 22fc257 stream large
discard 2cd1db7 group=true and group_levels working
discard 957e271 rough outline
add 9919496 Skip before_doc_update processing for local docs
add 5aed0fd Remove debug logging in fabric2_db_plugin
add dc1b4b8 Fix starkey_docid, endkey_docid and inclusive_end=false for _all_docs
add d5d5b5f Implement _all_docs/queries
add 247bbef Implement _design_docs and _local_docs
add d6b6155 Add a max db name length config option
add 198306c Handle update_seq for _local_docs
add 1c1d326 Handle _changes since=now param
add 2074412 Fetch docs in parallel for view indexing
add 5c3517a Fix _changes heartbeat option
add 91dfd3e Support `--extra_args` parameter in `dev/run`
add 5ffdb91 Merge pull request #2184 from cloudant/add-extra-arguments-to-beam
add 2eb1dee Implement setting and getting _revs_limit
add 9c6b5cb Make get_security and get_revs_limit calls consistent
add 2051bd6 Check members after db is opened
add 01a9228 Add revision stemming for interactive docs
add eed8fb1 Fix doc counts for replicated deletions
add fe9ffad Add more deleted docs replication cases to integration test
add 422416a Add couch_eval abstraction layer
add 41b7211 Initial creation of couch_js application
add 87379c2 Implement couch_js callbacks for couch_eval
add ecc9ae4 Add tests for couch_js application
add d7d32a0 Update couch_views to use couch_eval
add ff2cdb8 Fix mango index validation
add 577dd7f Fix timeout in couch_views
add 76bdf1b DRY out CouchDB FDB prefix fetching
add ae0dc96 Use a shorter name for create_or_open_couchdb_dir
add bfb986f Enable FDB transaction tracing
add f3d572c Take better advantage of metadata version key feature
add 3f322a5 Remove compiler warning
add ed1c3d7 Merge pull request #2274 from cloudant/fix-warning
add 797fe08 Remove old clause which is no longer used
add c3ef462 Merge pull request #2275 from cloudant/remove-ints-client-remains
add 5334997 Chunkify local docs
add 3ded0e5 Add a special error for an invalid legacy local doc revsion
new 03cbc41 Initial work
new e399fb3 printing
new 533fdb6 progress with reading level 0
new bff44fb can do group_level query
new 7ca5202 level 0 _sum working
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (c094548)
\
N -- N -- N refs/heads/prototype/builtin-reduce (7ca5202)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
dev/run | 16 +-
rebar.config.script | 2 +
rel/overlay/etc/default.ini | 13 +
rel/reltool.config | 4 +
src/chttpd/src/chttpd.erl | 21 +-
src/chttpd/src/chttpd_changes.erl | 16 +-
src/chttpd/src/chttpd_db.erl | 239 +++++++-----
src/couch/src/couch_httpd.erl | 3 -
src/couch_eval/README.md | 5 +
src/{fabric => couch_eval}/rebar.config | 0
.../src/couch_eval.app.src} | 11 +-
src/couch_eval/src/couch_eval.erl | 97 +++++
src/couch_jobs/src/couch_jobs_fdb.erl | 5 +-
src/couch_js/README.md | 6 +
.../rexi.app.src => couch_js/src/couch_js.app.src} | 17 +-
src/couch_js/src/couch_js.erl | 51 +++
.../src/couch_js_app.erl} | 12 +-
.../src/couch_js_io_logger.erl} | 2 +-
.../src/couch_js_native_process.erl} | 8 +-
.../src/couch_js_os_process.erl} | 16 +-
.../src/couch_js_proc_manager.erl} | 20 +-
.../src/couch_js_query_servers.erl} | 10 +-
.../src/couch_js_sup.erl} | 21 +-
src/couch_js/test/couch_js_proc_manager_tests.erl | 373 +++++++++++++++++++
.../test/couch_js_query_servers_tests.erl} | 47 +--
src/couch_views/include/couch_views.hrl | 7 -
src/couch_views/src/couch_views.app.src | 3 +-
src/couch_views/src/couch_views_fdb.erl | 236 +-----------
src/couch_views/src/couch_views_fdb_reduce.erl | 33 --
src/couch_views/src/couch_views_indexer.erl | 143 ++++++--
src/couch_views/src/couch_views_jobs.erl | 2 +-
src/couch_views/src/couch_views_reader.erl | 39 +-
src/couch_views/src/couch_views_reduce.erl | 350 ++++++------------
src/couch_views/src/couch_views_reduce_fdb.erl | 404 +++++++++++++++++++++
src/couch_views/src/couch_views_util.erl | 145 +++++++-
src/couch_views/test/couch_views_indexer_test.erl | 1 +
src/couch_views/test/couch_views_map_test.erl | 7 +-
.../test/couch_views_trace_index_test.erl | 2 +-
.../test/exunit/couch_views_reduce_test.exs | 251 ++++++++-----
src/fabric/include/fabric2.hrl | 3 +
src/fabric/src/fabric2_db.erl | 204 +++++++++--
src/fabric/src/fabric2_db_plugin.erl | 11 +-
src/fabric/src/fabric2_fdb.erl | 240 +++++++++---
src/fabric/src/fabric2_txids.erl | 10 +-
src/fabric/test/fabric2_db_misc_tests.erl | 56 ++-
src/fabric/test/fabric2_db_security_tests.erl | 64 ++--
src/fabric/test/fabric2_doc_count_tests.erl | 26 ++
src/fabric/test/fabric2_doc_crud_tests.erl | 131 ++++++-
..._tests.erl => fabric2_local_doc_fold_tests.erl} | 85 +++--
src/fabric/test/fabric2_rev_stemming.erl | 204 +++++++++++
src/mango/src/mango_native_proc.erl | 7 +-
test/elixir/test/basics_test.exs | 195 ++++++++++
test/elixir/test/replication_test.exs | 62 +++-
53 files changed, 2897 insertions(+), 1039 deletions(-)
create mode 100644 src/couch_eval/README.md
copy src/{fabric => couch_eval}/rebar.config (100%)
copy src/{couch_pse_tests/src/couch_pse_tests.app.src => couch_eval/src/couch_eval.app.src} (80%)
create mode 100644 src/couch_eval/src/couch_eval.erl
create mode 100644 src/couch_js/README.md
copy src/{rexi/src/rexi.app.src => couch_js/src/couch_js.app.src} (78%)
create mode 100644 src/couch_js/src/couch_js.erl
copy src/{couch_event/src/couch_event_app.erl => couch_js/src/couch_js_app.erl} (87%)
copy src/{couch/src/couch_io_logger.erl => couch_js/src/couch_js_io_logger.erl} (98%)
copy src/{couch/src/couch_native_process.erl => couch_js/src/couch_js_native_process.erl} (99%)
copy src/{couch/src/couch_os_process.erl => couch_js/src/couch_js_os_process.erl} (95%)
copy src/{couch/src/couch_proc_manager.erl => couch_js/src/couch_js_proc_manager.erl} (97%)
copy src/{couch/src/couch_query_servers.erl => couch_js/src/couch_js_query_servers.erl} (98%)
copy src/{couch_views/src/couch_views_sup.erl => couch_js/src/couch_js_sup.erl} (69%)
create mode 100644 src/couch_js/test/couch_js_proc_manager_tests.erl
copy src/{couch/test/eunit/couch_query_servers_tests.erl => couch_js/test/couch_js_query_servers_tests.erl} (76%)
delete mode 100644 src/couch_views/src/couch_views_fdb_reduce.erl
create mode 100644 src/couch_views/src/couch_views_reduce_fdb.erl
copy src/fabric/test/{fabric2_doc_fold_tests.erl => fabric2_local_doc_fold_tests.erl} (73%)
create mode 100644 src/fabric/test/fabric2_rev_stemming.erl
[couchdb] 03/05: progress with reading level 0
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 533fdb660fe42726af5d5a948cbc386234d16341
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Oct 30 11:37:36 2019 +0200
progress with reading level 0
---
src/couch_views/src/couch_views_fdb.erl | 4 +-
src/couch_views/src/couch_views_indexer.erl | 8 +-
src/couch_views/src/couch_views_reader.erl | 30 --
src/couch_views/src/couch_views_reduce.erl | 306 +++++++--------------
src/couch_views/src/couch_views_reduce_fdb.erl | 274 +++++++++++++++++-
.../test/exunit/couch_views_reduce_test.exs | 94 +++++--
6 files changed, 428 insertions(+), 288 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 479a707..8999d76 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -133,8 +133,6 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
-
-
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -181,7 +179,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
ExistingKeys, ReduceResult)
end, lists:zip3(ViewIds, Results, ReduceResults)).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index d4a78b8..3c60743 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -221,11 +221,17 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = Mrst,
#{
- last_seq := LastSeq
+ last_seq := LastSeq,
+ view_seq := ViewSeq
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ %% First build of the view
+ if ViewSeq /= <<>> -> ok; true ->
+ couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+ end,
+
lists:foreach(fun(Doc) ->
couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
end, Docs),
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index d08515c..394b3cf 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -236,36 +236,6 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
-handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
- Acc#{skip := Skip - 1};
-
-handle_reduce_row(Key, Value, Acc) ->
- io:format("ACC ~p ~n", [Acc]),
- #{
- callback := UserCallback,
- acc := UserAcc0,
- row_count := RowCount,
- limit := Limit
- } = Acc,
-
- Row = [
- {key, Key},
- {value, Value}
- ],
-
- RowCountNext = RowCount + 1,
-
- UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
- Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
-
- case RowCountNext == Limit of
- true ->
- UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
- maybe_stop({stop, UserAcc2});
- false ->
- Acc1
- end.
-
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 4cb7416..ebd2f47 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,8 @@
-export([
run_reduce/2,
- update_reduce_idx/6,
- read_reduce/6
+ read_reduce/6,
+ setup_reduce_indexes/3
]).
@@ -30,99 +30,120 @@
-define(MAX_SKIP_LIST_LEVELS, 6).
-log_levels(Db, Sig, ViewId) ->
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
#{
db_prefix := DbPrefix
} = Db,
- Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
- Opts = [{streaming_mode, want_all}],
+ #mrargs{
+ limit = Limit
+ } = Args,
- fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- lists:foreach(fun (Level) ->
- {StartKey, EndKey} = erlfdb_tuple:range({Level},
- ReduceIdxPrefix),
+ Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ %% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
Acc0 = #{
sig => Sig,
view_id => ViewId,
+ user_acc => UserAcc0,
+ args => Args,
+ callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
- next => key,
- key => undefined,
- rows => []
+ limit => Limit,
+ row_count => 0
},
- Fun = fun fold_fwd_cb/2,
- Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ Fun = fun handle_row/3,
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
#{
- rows := Rows
- } = Acc,
- io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
- {ok, Rows}
- end, Levels),
- {ok, []}
- end).
-
+ user_acc := UserAcc1
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc1))}
+ end)
+ catch throw:{done, Out} ->
+ {ok, Out}
+ end.
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
- #{
- db_prefix := DbPrefix
- } = Db,
- Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
- fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- log_levels(TxDb, Sig, ViewId),
-%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-
-
- Acc0 = #{
- sig => Sig,
- view_id => ViewId,
- user_acc => UserAcc0,
- args => Args,
- callback => UserCallback,
- reduce_idx_prefix => ReduceIdxPrefix,
- rows => []
- },
-
-
-%% Opts = [{limit, 2}, {streaming_mode, want_all}],
-%% EK = couch_views_encoding:encode(0, key),
-%% {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
-%% ReduceIdxPrefix),
-%%
-%% Fun = fun fold_fwd_cb/2,
-%% Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
- #{
- rows := Rows
- } = Acc0,
- {ok, Rows}
- end).
-
-args_to_fdb_opts(#mrargs{} = Args) ->
+args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
#mrargs{
- limit = Limit,
- start_key = StartKey,
- end_key = EndKey
+%% limit = Limit,
+%% start_key = StartKey,
+%% end_key = EndKey,
+ group = Group,
+ group_level = GroupLevel
} = Args,
- ok.
+ {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
+ ReduceIdxPrefix),
+
+ StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
+
+%% StartKey1 = case StartKey of
+%% undefined -> erlfdb_key:first_greater_than(StartKey0);
+%% StartKey -> create_key(StartKey, 0, Red)
+%% end,
+
+ StartKey1 = erlfdb_key:first_greater_than(StartKey0),
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+ [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+
+
+encode_key(Key, Level) ->
+ {Level, couch_views_encoding:encode(Key, key)}.
+
+
+handle_row(Key, Value, Acc) ->
#{
- reduce_idx_prefix := ReduceIdxPrefix,
- rows := Rows
+ callback := UserCallback,
+ user_acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
} = Acc,
- {_Level, _EK}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- {EK, EV1} = erlfdb_tuple:unpack(EV),
- Key = couch_views_encoding:decode(EK),
- Val = couch_views_encoding:decode(EV1),
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
- Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
+ RowCountNext = RowCount + 1,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc1 = Acc#{user_acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+setup_reduce_indexes(Db, Sig, ViewIds) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun (ViewId) ->
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ couch_views_reduce_fdb:create_skip_list(TxDb,
+ ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+ end, ViewIds)
+ end).
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -192,149 +213,6 @@ is_builtin(<<"_", _/binary>>) ->
is_builtin(_) ->
false.
-
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
- #{
- db_prefix := DbPrefix
- } = TxDb,
-
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId
- },
- create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
-
- lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val])
-%% add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
- end, ReduceResult).
-
-
-create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
- #{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId
- } = ViewOpts,
-
- Levels = lists:seq(0, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
-
- lists:foreach(fun(Level) ->
- add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
- end, Levels)
- end).
-
-
-should_add_key_to_level(Level, Key) ->
- (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
-
-
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
- #{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId
- } = ViewOpts,
-
- Levels = lists:seq(0, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foldl(fun(Level) ->
- io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
- {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
- io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
- case should_add_key_to_level(Level, Key) of
- true ->
- io:format("Adding ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
- false ->
- {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
- io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
- end
- end, true, Levels)
- end).
-
-
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
- case PrevVal >= Val of
- true -> {PrevKey, PrevVal};
- false -> {PrevKey, Val}
- end.
-
-
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
-
-
-create_key(ReduceIdxPrefix, SkipLevel, Key) ->
- EK = couch_views_encoding:encode(Key, key),
- LevelKey = {SkipLevel, EK},
- erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
-
-
-create_value(Key, Val) ->
- EK = couch_views_encoding:encode(Key),
- EV = couch_views_encoding:encode(Val),
- erlfdb_tuple:pack({EK, EV}).
-
-
-add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
- #{
- tx := Tx
- } = TxDb,
-
- LevelKey = create_key(ReduceIdxPrefix, Level, Key),
- EV = create_value(Key, Val),
-
- ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
- #{
- tx := Tx
- } = TxDb,
-
- % TODO: see if we need to add in conflict ranges for this for level=0
- Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
-%% LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
-
- EK = couch_views_encoding:encode(Key, key),
- EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-
- {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
-%% EndKey1 = erlfdb_key:first_greater_than(EndKey0),
-
- Callback = fun row_cb/2,
- Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
- io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
- Out.
-
-
-row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
- io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
- {_Level, EK, _VIEW_ROW_VALUE}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- Val = couch_views_encoding:decode(EV),
-%% io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
-
- {key, {EK, ReduceIdxPrefix, Val}};
-
-row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
- io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
- {_Level, EK, ?VIEW_ROW_KEY}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- Key = couch_views_encoding:decode(EVK),
-
- {Key, Val}.
-
-
-
-
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index bcaaa30..9683265 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,19 +15,273 @@
-export([
-%% write_doc/4
+ fold_level0/6,
+ create_skip_list/3,
+ update_reduce_idx/6
]).
-% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
-%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
+-define(LEVEL_FAN_POW, 1).
+
+log_levels(Db, Sig, ViewId) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, 6),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Opts = [{streaming_mode, want_all}],
+
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ lists:foreach(fun (Level) ->
+ {StartKey, EndKey} = erlfdb_tuple:range({Level},
+ ReduceIdxPrefix),
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ user_acc => [],
+ callback => fun handle_log_levels/3
+ },
+
+ Fun = fun fold_fwd_cb/2,
+ Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ user_acc := Rows
+ } = Acc,
+ io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
+ end, Levels)
+ end).
+
+handle_log_levels(Key, Value, Acc) ->
+ Acc ++ [{Key, Value}].
+
+%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
%% #{
-%% id := DocId,
-%% reduce_results := ReduceResults
-%% } = Doc,
-%% lists:foreach(fun({ViewId, NewRows}) ->
-%% % update reduce index
-%% ok
-%% end, lists:zip(ViewIds, ReduceResults)).
+%% db_prefix := DbPrefix
+%% } = Db,
+%%
+%%%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+%% #mrargs{
+%% limit = Limit
+%% } = Args,
+%%
+%% fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%
+%% Acc0 = #{
+%% sig => Sig,
+%% view_id => ViewId,
+%% user_acc => UserAcc0,
+%% args => Args,
+%% callback => UserCallback,
+%% reduce_idx_prefix => ReduceIdxPrefix,
+%% limit => Limit,
+%% row_count => 0
+%%
+%% },
+%%
+%% Acc1 = read_level0_only(TxDb, Acc0, Callback),
+%% #{
+%% user_acc := UserAcc1
+%% } = Acc1,
+%% {ok, UserAcc1}
+%% end).
+
+fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Level = 0,
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Acc = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ %% args := Args,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix
+ },
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+ #{
+ tx := Tx
+ } = TxDb,
+
+
+%% {StartKey, EndKey} = erlfdb_tuple:range({0},
+%% ReduceIdxPrefix),
+ {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+ {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
+%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Fun = fun fold_fwd_cb/2,
+ Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
+ #{
+ user_acc := UserAcc1
+ } = Acc1,
+ UserAcc1
+ end).
+
+
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix,
+ callback := Callback,
+ user_acc := UserAcc
+ } = Acc,
+
+ {_Level, _EK}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ {EK, EV1} = erlfdb_tuple:unpack(EV),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
+
+ UserAcc1 = Callback(Key, Val, UserAcc),
+ Acc#{user_acc := UserAcc1}.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+ lists:foreach(fun(Level) ->
+ add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+ end, Levels)
+ end).
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ KeyHash = hash_key(Key),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun(Level) ->
+ {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+ io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+ case should_add_key_to_level(Level, KeyHash) of
+ true ->
+ io:format("Adding ~p ~p ~n", [Level, Key]),
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ false ->
+ {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+ end
+ end, Levels)
+ end).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ % TODO: see if we need to add in conflict ranges for this for level=0
+ Opts = [{limit, 1}, {streaming_mode, want_all}],
+
+ EK = couch_views_encoding:encode(Key, key),
+ StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+ StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+ EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+
+ Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
+ [{_FullEncodedKey, PackedValue}] = erlfdb:wait(Future),
+ get_key_value(PackedValue).
+
+
+hash_key(Key) ->
+ erlang:phash2(Key).
+
+
+should_add_key_to_level(Level, KeyHash) ->
+ (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
+%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+ EK = couch_views_encoding:encode(Key, key),
+ LevelKey = {SkipLevel, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+ EK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+ erlfdb_tuple:pack({EK, EV}).
+
+
+get_key_value(PackedValue) ->
+ {EncodedKey, EncodedValue}
+ = erlfdb_tuple:unpack(PackedValue),
+ Key = couch_views_encoding:decode(EncodedKey),
+ Value = couch_views_encoding:decode(EncodedValue),
+ {Key, Value}.
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+ EV = create_value(Key, Val),
+
+ ok = erlfdb:set(Tx, LevelKey, EV).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+ case PrevVal >= Val of
+ true -> {PrevKey, PrevVal};
+ false -> {PrevKey, Val}
+ end.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index a526658..c1b35e2 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,29 +40,37 @@ defmodule CouchViewsReduceTest do
}
end
- test "group=true count reduce", context do
- args = %{
- :reduce => true,
- :group => true
- # :limit => 9
- }
+# test "group=true count reduce with limit", context do
+# args = %{
+# :reduce => true,
+# :group => true,
+# :limit => 3
+# }
+#
+# {:ok, res} = run_query(context, args, "dates")
+# IO.inspect(res, label: "OUT")
+#
+# assert res == [
+# {:row, [key: [2017, 3, 1], value: 1]},
+# {:row, [key: [2017, 4, 1], value: 1]},
+# {:row, [key: [2017, 4, 15], value: 1]}
+# ]
+# end
+
+ test "group_level=1 count reduce", context do
+ args = %{
+ :reduce => true,
+ :group => true,
+ }
- {:ok, res} = run_query(context, args, "baz")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: 1, value: 2]},
- {:row, [key: 2, value: 2]},
- {:row, [key: 3, value: 2]},
- {:row, [key: [1, 1], value: 1]},
- {:row, [key: [1, 1, 5], value: 1]},
- {:row, [key: [1, 2, 6], value: 1]},
- {:row, [key: [2, 1], value: 1]},
- {:row, [key: [2, 3, 6], value: 1]},
- {:row, [key: [3, 1], value: 1]},
- {:row, [key: [3, 1, 5], value: 1]},
- {:row, [key: [3, 4, 5], value: 1]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 1]},
+ {:row, [key: [2018], value: 1]},
+ {:row, [key: [2019], value: 1]}
+ ]
end
# test "group=1 count reduce", context do
@@ -173,6 +181,7 @@ defmodule CouchViewsReduceTest do
end
def default_cb(:complete, acc) do
+ IO.inspect(acc, label: "complete")
{:ok, Enum.reverse(acc)}
end
@@ -197,7 +206,22 @@ defmodule CouchViewsReduceTest do
end
defp create_docs() do
- for i <- 1..1 do
+ dates = [
+ [2017, 3, 1],
+ [2017, 4, 1],
+ # out of order check
+ [2019, 3, 1],
+ [2017, 4, 15],
+ [2018, 4, 1],
+ [2017, 5, 1],
+ [2018, 3, 1],
+ # duplicate check
+ [2018, 4, 1],
+ [2018, 5, 1],
+ [2019, 4, 1]
+ ]
+
+ for i <- 1..4 do
group =
if rem(i, 3) == 0 do
"first"
@@ -205,14 +229,14 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj(
- {[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group}
- ]}
- )
+ :couch_doc.from_json_obj({[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", Enum.at(dates, i - 1)}
+ # {"timestamp", Enum.at(timestamps, i - 1)}
+ ]})
end
end
@@ -221,6 +245,16 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
+ {"dates",
+ {[
+ {"map",
+ """
+ function(doc) {
+ emit(doc.date, doc.value);
+ }
+ """},
+ {"reduce", "_count"}
+ ]}},
{"baz",
{[
{"map",
[couchdb] 01/05: Initial work
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 03cbc41c15a3e239e4c5f4e820d19593cc2b8dc4
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200
Initial work
---
src/couch_views/include/couch_views.hrl | 1 +
src/couch_views/src/couch_views.erl | 14 +-
src/couch_views/src/couch_views_fdb.erl | 116 ++++++-
src/couch_views/src/couch_views_indexer.erl | 5 +
src/couch_views/src/couch_views_reader.erl | 155 ++++++++-
src/couch_views/src/couch_views_reduce.erl | 364 +++++++++++++++++++++
.../couch_views_reduce_fdb.erl} | 29 +-
.../test/exunit/couch_views_reduce_test.exs | 286 ++++++++++++++++
src/couch_views/test/exunit/test_helper.exs | 2 +
9 files changed, 950 insertions(+), 22 deletions(-)
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..e97d777 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
-define(VIEW_ID_INFO, 1).
-define(VIEW_ID_RANGE, 2).
-define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
-define(VIEW_ROW_COUNT, 0).
-define(VIEW_KV_SIZE, 1).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 7c7588c..b7fe4c4 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
Args1 = to_mrargs(Args0),
Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
Args3 = couch_mrview_util:validate_args(Args2),
+
ok = check_range(Args3),
- case is_reduce_view(Args3) of
- true -> throw({not_implemented});
- false -> ok
- end,
ok = maybe_update_view(Db, Mrst, Args3),
try
- couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args3)
+ case is_reduce_view(Args3) of
+ true ->
+ couch_views_reader:read_reduce(Db, Mrst, ViewName,
+ Callback, Acc0, Args3);
+ false ->
+ couch_views_reader:read(Db, Mrst, ViewName,
+ Callback, Acc0, Args3)
+ end
after
UpdateAfter = Args3#mrargs.update == lazy,
if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..07241dd 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
get_kv_size/3,
fold_map_idx/6,
+ fold_reduce_idx/6,
write_doc/4
]).
@@ -42,6 +43,15 @@
% View Build Sequence Access
% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+% = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+% = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
get_update_seq(TxDb, #mrst{sig = Sig}) ->
#{
@@ -124,6 +134,8 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
+
+
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -134,6 +146,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
clear_id_idx(TxDb, Sig, DocId),
lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+ %clear_reduce_idx
update_row_count(TxDb, Sig, ViewId, -TotalKeys),
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
@@ -141,14 +154,17 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
write_doc(TxDb, Sig, ViewIds, Doc) ->
#{
id := DocId,
- results := Results
+ results := Results,
+ reduce_results := ReduceResults
} = Doc,
ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
clear_id_idx(TxDb, Sig, DocId),
- lists:foreach(fun({ViewId, NewRows}) ->
+ %% TODO: handle when there is no reduce
+ io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
+ lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +181,11 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
update_kv_size(TxDb, Sig, ViewId, SizeChange),
[]
end,
- update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
- end, lists:zip(ViewIds, Results)).
+ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+ couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ ExistingKeys, ReduceResult),
+ update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+ end, lists:zip3(ViewIds, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
@@ -338,6 +357,53 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
end, KVsToAdd).
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+%% Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%% KeysToRem = ExistingKeys -- Unique,
+%% lists:foreach(fun(RemKey) ->
+%% {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%% ok = erlfdb:clear_range(Tx, Start, End)
+%% end, KeysToRem),
+%%
+ {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+ ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+ lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+ KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+ ReduceType, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+ ReduceType, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, Key2),
+ ok = erlfdb:add(Tx, VK, Val)
+ end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+ Key = {ReduceKey, GroupLevel, ReduceType, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%% Encoded = couch_views_encoding:encode(MapKey, key),
+%% Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%% erlfdb_tuple:range(Key, DbPrefix).
+
+
get_view_keys(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -394,7 +460,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
erlfdb_tuple:range(Key, DbPrefix).
-
map_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +497,47 @@ process_rows(Rows) ->
end, [], Grouped).
+process_reduce_rows(Rows) ->
+ ReduceExact = encode_reduce_rows(Rows),
+ ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+ Out = create_grouping(Key, Val, [], Groupings),
+ Out
+ end, #{}, Rows),
+ ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+ {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+ lists:map(fun({K, V}) ->
+ EK1 = couch_views_encoding:encode(K, key),
+ EK2 = couch_views_encoding:encode(K, value),
+ {EK1, EK2, V, group_level(K)}
+ end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+ length(Key);
+
+group_level(_Key) ->
+ 1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+ Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+ Key1 = Key ++ [Head],
+ Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings),
+ create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+ maps:update_with(Key, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings).
+
+
calculate_row_size(Rows) ->
lists:foldl(fun({K, V}, Acc) ->
Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce063..2f38ac1 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -101,6 +101,7 @@ update(#{} = Db, Mrst0, State0) ->
DocAcc1 = fetch_docs(TxDb, DocAcc),
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
+ Results = run_reduce(Mrst1, MappedDocs),
write_docs(TxDb, Mrst1, MappedDocs, State2),
case Count < Limit of
@@ -209,6 +210,10 @@ map_docs(Mrst, Docs) ->
{Mrst1, MappedDocs}.
+run_reduce(Mrst, MappedResults) ->
+ couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
write_docs(TxDb, Mrst, Docs, State) ->
#mrst{
views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..d08515c 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
-module(couch_views_reader).
-export([
+ read_reduce/6,
read/6
]).
@@ -23,6 +24,128 @@
-include_lib("fabric/include/fabric2.hrl").
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+ #mrst{
+ language = Lang,
+ sig = Sig,
+ views = Views
+ } = Mrst,
+
+ ViewId = get_view_id(Lang, Args, ViewName, Views),
+ couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+ UserAcc0, Args).
+%% Fun = fun handle_reduce_row/3,
+%%
+%% try
+%% fabric2_fdb:transactional(Db, fun(TxDb) ->
+%% Meta = get_meta(TxDb, Mrst, ViewId, Args),
+%% UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+%%
+%% #mrargs{
+%% limit = Limit
+%% } = Args,
+%%
+%% Acc0 = #{
+%% db => TxDb,
+%% skip => Args#mrargs.skip,
+%% mrargs => undefined,
+%% callback => UserCallback,
+%% acc => UserAcc1,
+%% row_count => 0,
+%% limit => Limit
+%% },
+%%
+%% Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+%% Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+%% KeyAcc1 = KeyAcc0#{
+%% mrargs := KeyArgs
+%% },
+%% couch_views_fdb:fold_reduce_idx(
+%% TxDb,
+%% Sig,
+%% ViewId,
+%% Opts,
+%% Fun,
+%% KeyAcc1
+%% )
+%% end, Acc0, expand_keys_args(Args)),
+%%
+%% #{
+%% acc := UserAcc2
+%% } = Acc1,
+%% {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+%% end)
+%% catch throw:{done, Out} ->
+%% {ok, Out}
+%% end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+ #mrargs{
+%% start_key = StartKey0,
+%% start_key_docid = StartKeyDocId,
+%% end_key = EndKey0,
+%% end_key_docid = EndKeyDocId,
+ direction = Direction,
+ limit = Limit,
+ skip = Skip,
+ group_level = GroupLevel,
+ group = Group
+%% inclusive_end = InclusiveEnd
+ } = Args,
+
+ GroupExact = Group == true andalso GroupLevel == 0,
+
+ GroupLevelEnd = case GroupExact of
+ true -> [];
+ false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+ end,
+
+%% StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(StartKey0, key)
+%% end,
+%%
+%% StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%% {undefined, _} ->
+%% [];
+%% {StartKey1, StartKeyDocId} ->
+%% [{start_key, {StartKey1, StartKeyDocId}}]
+%% end,
+%%
+%% EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(EndKey0, key)
+%% end,
+%%
+%% EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%% {undefined, _, _} ->
+%% [];
+%% {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%% % When we iterate in reverse with
+%% % inclusive_end=false we have to set the
+%% % EndKeyDocId to <<255>> so that we don't
+%% % include matching rows.
+%% [{end_key_gt, {EndKey1, <<255>>}}];
+%% {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%% % When inclusive_end=false we need to
+%% % elide the default end_key_docid so as
+%% % to not sort past the docids with the
+%% % given end key.
+%% [{end_key_gt, {EndKey1}}];
+%% {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%% [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%% {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%% [{end_key, {EndKey1, EndKeyDocId}}]
+%% end,
+
+ [
+ {dir, Direction},
+%% {limit, Limit * 2 + Skip * 2},
+ {streaming_mode, large}
+%% {streaming_mode, want_all}
+ ] ++ GroupLevelEnd.
+%% ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
#mrst{
language = Lang,
@@ -113,11 +236,41 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+ io:format("ACC ~p ~n", [Acc]),
+ #{
+ callback := UserCallback,
+ acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
+ } = Acc,
+
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
+
+ RowCountNext = RowCount + 1,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
+
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
{map, View, _Args} -> View#mrview.id_num;
- {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+ {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..1502f38
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,364 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+ run_reduce/2,
+ update_reduce_idx/6,
+ read_reduce/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+-define(LEVEL_FAN_POW, 4).
+-define(MAX_SKIP_LIST_LEVELS, 6).
+
+
+log_levels(Db, Sig, ViewId) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Opts = [{streaming_mode, want_all}],
+
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ lists:foreach(fun (Level) ->
+ {StartKey, EndKey} = erlfdb_tuple:range({Level},
+ ReduceIdxPrefix),
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ next => key,
+ key => undefined,
+ rows => []
+ },
+
+ Fun = fun fold_fwd_cb/2,
+ Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ rows := Rows
+ } = Acc,
+ io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+ {ok, Rows}
+ end, Levels),
+ {ok, []}
+ end).
+
+
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ args => Args,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ next => key,
+ rows => []
+ },
+
+
+%% Opts = [{limit, 2}, {streaming_mode, want_all}],
+%% EK = couch_views_encoding:encode(0, key),
+%% {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
+%% ReduceIdxPrefix),
+%%
+%% Fun = fun fold_fwd_cb/2,
+%% Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ rows := Rows
+ } = Acc0,
+ {ok, Rows}
+ end).
+
+args_to_fdb_opts(#mrargs{} = Args) ->
+ #mrargs{
+ limit = Limit,
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+ ok.
+
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix
+ } = Acc,
+
+ {Level, EK, ?VIEW_ROW_KEY}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%% Key = couch_views_encoding:decode(EV),
+ Val = couch_views_encoding:decode(EV),
+ Acc#{next := value, key := Val};
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix,
+ rows := Rows,
+ key := Key
+ } = Acc,
+
+ {Level, EK, ?VIEW_ROW_VALUE}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%% Key = couch_views_encoding:decode(EV),
+ Val = couch_views_encoding:decode(EV),
+ Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+ ReduceFuns = lists:map(fun(View) ->
+ #mrview{
+ id_num = Id,
+ reduce_funs = ViewReduceFuns
+ } = View,
+
+ [{_, Fun}] = ViewReduceFuns,
+ Fun
+ end, Views),
+
+ lists:map(fun (MappedResult) ->
+ #{
+ results := Results
+ } = MappedResult,
+
+ ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+ reduce(ReduceFun, Result)
+ end, lists:zip(ReduceFuns, Results)),
+
+ MappedResult#{
+ reduce_results => ReduceResults
+ }
+ end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Val} = Acc,
+ Acc#{Key := Val + 1};
+ false ->
+ Acc#{Key => 1}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ io:format("MAX ~p ~p ~n", [Key, Val]),
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Max} = Acc,
+ case Max >= Val of
+ true ->
+ Acc;
+ false ->
+ Acc#{Key := Val}
+ end;
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+ true;
+
+is_builtin(_) ->
+ false.
+
+
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+ lists:foreach(fun(Level) ->
+ add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+ end, Levels)
+ end).
+
+%% This sucks but its simple for now
+should_add_key_to_level(0, _, _) ->
+ true;
+
+should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
+ false;
+
+should_add_key_to_level(_, _, false) ->
+ false;
+
+should_add_key_to_level(_, _Key, _Prev) ->
+ crypto:rand_uniform(0, 2) == 0.
+
+%%should_add_key_to_level(Level, Key) ->
+%% erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foldl(fun(Level, PrevCoinFlip) ->
+ io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
+ {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+ io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
+ case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+ true ->
+ io:format("Adding ~p ~p ~n", [Level, Key]),
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
+ true;
+ false ->
+ {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
+ false
+ end
+ end, true, Levels)
+ end).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+ case PrevVal >= Val of
+ true -> {PrevKey, PrevVal};
+ false -> {PrevKey, Val}
+ end.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
+ Key = {SkipLevel, ReduceKey, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ EK = couch_views_encoding:encode(Key, key),
+ EVK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+
+ KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, EVK),
+ ok = erlfdb:set(Tx, VK, EV).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ % TODO: see if we need to add in conflict ranges for this for level=0
+ Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
+%% LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
+
+ EK = couch_views_encoding:encode(Key, key),
+ EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+
+ {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
+%% EndKey1 = erlfdb_key:first_greater_than(EndKey0),
+
+ Callback = fun row_cb/2,
+ Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
+ io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
+ Out.
+
+
+row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
+ io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+ {_Level, EK, _VIEW_ROW_VALUE}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ Val = couch_views_encoding:decode(EV),
+%% io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
+
+ {key, {EK, ReduceIdxPrefix, Val}};
+
+row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
+ io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+ {_Level, EK, ?VIEW_ROW_KEY}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ Key = couch_views_encoding:decode(EVK),
+
+ {Key, Val}.
+
+
+
+
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_reduce_fdb.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_reduce_fdb.erl
index 2e443eb..bcaaa30 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -10,17 +10,24 @@
% License for the specific language governing permissions and limitations under
% the License.
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_reduce_fdb).
+
+
+-export([
+%% write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%% #{
+%% id := DocId,
+%% reduce_results := ReduceResults
+%% } = Doc,
+%% lists:foreach(fun({ViewId, NewRows}) ->
+%% % update reduce index
+%% ok
+%% end, lists:zip(ViewIds, ReduceResults)).
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..3f7a173
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,286 @@
+defmodule CouchViewsReduceTest do
+ use Couch.Test.ExUnit.Case
+
+ alias Couch.Test.Utils
+
+ alias Couch.Test.Setup
+
+ alias Couch.Test.Setup.Step
+
+ setup_all do
+ test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+ on_exit(fn ->
+ :test_util.stop_couch(test_ctx)
+ end)
+ end
+
+ setup do
+ db_name = Utils.random_name("db")
+
+ admin_ctx =
+ {:user_ctx,
+ Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+ {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+ docs = create_docs()
+ ddoc = create_ddoc()
+
+ {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+ on_exit(fn ->
+ :fabric2_db.delete(db_name, [admin_ctx])
+ end)
+
+ %{
+ :db_name => db_name,
+ :db => db,
+ :ddoc => ddoc
+ }
+ end
+
+ # test "group=true count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true
+ # # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 1]},
+ # {:row, [key: [1, 1, 5], value: 1]},
+ # {:row, [key: [1, 2, 6], value: 1]},
+ # {:row, [key: [2, 1], value: 1]},
+ # {:row, [key: [2, 3, 6], value: 1]},
+ # {:row, [key: [3, 1], value: 1]},
+ # {:row, [key: [3, 1, 5], value: 1]},
+ # {:row, [key: [3, 4, 5], value: 1]}
+ # ]
+ # end
+
+ # test "group=1 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 1
+ # # :limit => 6
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1], value: 2]},
+ # {:row, [key: [2], value: 2]},
+ # {:row, [key: [3], value: 2]}
+ # ]
+ # end
+ #
+ # test "group=2 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 2,
+ # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 2]},
+ # {:row, [key: [1, 2], value: 1]},
+ # {:row, [key: [2, 1], value: 1]},
+ # {:row, [key: [2, 3], value: 1]},
+ # {:row, [key: [3, 1], value: 2]},
+ # {:row, [key: [3, 4], value: 1]}
+ # ]
+ # end
+ #
+ # test "group=2 count reduce with limit = 3", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 2,
+ # :limit => 4
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 1]}
+ # ]
+ # end
+ #
+ # # [
+ # # row: [key: [2019, 1, 2], value: 1],
+ # # row: [key: [2019, 1, 4], value: 1],
+ # # row: [key: [2019, 2, 1], value: 1],
+ # # row: [key: [2019, 2, 3], value: 1]
+ # # ]
+ #
+ # test "group=2 count reduce with startkey", context do
+ # args = %{
+ # # :reduce => true,
+ # # :group_level => 2,
+ # :start_key => [2019, 1, 4]
+ # # :limit => 4
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "boom")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2019, 1], value: 1]},
+ # {:row, [key: [2019, 2], value: 2]}
+ # ]
+ # end
+
+ test "group_level=0 _max reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 0
+ # :limit => 9
+ }
+
+ {:ok, res} = run_query(context, args, "max")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: :null, value: 3]}
+ ]
+ end
+
+ defp run_query(context, args, view) do
+ db = context[:db]
+ ddoc = context[:ddoc]
+
+ :couch_views.query(db, ddoc, view, &__MODULE__.default_cb/2, [], args)
+ end
+
+ def default_cb(:complete, acc) do
+ {:ok, Enum.reverse(acc)}
+ end
+
+ def default_cb({:final, info}, []) do
+ {:ok, [info]}
+ end
+
+ def default_cb({:final, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb({:meta, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb(:ok, :ddoc_updated) do
+ {:ok, :ddoc_updated}
+ end
+
+ def default_cb(row, acc) do
+ {:ok, [row | acc]}
+ end
+
+ defp create_docs() do
+ for i <- 1..1 do
+ group =
+ if rem(i, 3) == 0 do
+ "first"
+ else
+ "second"
+ end
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group}
+ ]}
+ )
+ end
+ end
+
+ defp create_ddoc() do
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "_design/bar"},
+ {"views",
+ {[
+# {"baz",
+# {[
+# {"map",
+# """
+# function(doc) {
+# emit(doc.value, doc.value);
+# emit(doc.value, doc.value);
+# emit([doc.value, 1], doc.value);
+# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+# if (doc.value === 3) {
+# emit([1, 1, 5], 1);
+# emit([doc.value, 1, 5], 1);
+# }
+# }
+# """},
+# {"reduce", "_count"}
+# ]}},
+# {"boom",
+# {[
+# {"map",
+# """
+# function(doc) {
+# var month = 1;
+# if (doc.value % 2) {
+# month = 2;
+# }
+# emit([2019, month, doc.value], doc.value);
+# }
+# """},
+# {"reduce", "_count"}
+# ]}},
+ {"max",
+ {[
+ {"map",
+ """
+ function(doc) {
+ //emit(doc.value, doc.value);
+ //emit([doc.value, 1], doc.value);
+ //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ emit(1, 1);
+ emit(2, 2);
+ emit(3, 3);
+ emit(4, 4);
+
+ emit([2019, 2, 2], 1);
+ emit([2019, 3, 3], 2);
+ emit([2019, 3, 3], 3);
+ emit([2019, 4, 3], 4);
+ emit([2019, 5, 3], 6);
+ if (doc.value === 3) {
+ //emit([doc.value, 1, 5], 1);
+ }
+ }
+ """},
+ {"reduce", "_stats"}
+ ]}}
+ ]}}
+ ]}
+ )
+ end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()
[couchdb] 04/05: can do group_level query
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit bff44fb9f7cfc92e21012c705b8cd1a9938fac88
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 14:09:36 2019 +0200
can do group_level query
---
src/couch_views/src/couch_views_reader.erl | 9 +-
src/couch_views/src/couch_views_reduce.erl | 15 +-
src/couch_views/src/couch_views_reduce_fdb.erl | 175 +++++++++++++++------
.../test/exunit/couch_views_reduce_test.exs | 47 +++---
4 files changed, 168 insertions(+), 78 deletions(-)
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 394b3cf..e750a94 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -32,7 +32,8 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
} = Mrst,
ViewId = get_view_id(Lang, Args, ViewName, Views),
- couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+ Reducer = get_view_reducer(Lang, Args, ViewName, Views),
+ couch_views_reduce:read_reduce(Db, Sig, ViewId, Reducer, UserCallback,
UserAcc0, Args).
%% Fun = fun handle_reduce_row/3,
%%
@@ -243,6 +244,12 @@ get_view_id(Lang, Args, ViewName, Views) ->
{red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
+get_view_reducer(Lang, Args, ViewName, Views) ->
+ case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+ {map, View, _Args} -> throw(no_reduce);
+ View -> couch_mrview_util:extract_view_reduce(View)
+ end.
+
expand_keys_args(#mrargs{keys = undefined} = Args) ->
[Args];
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index ebd2f47..b7eb18e 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,7 +15,7 @@
-export([
run_reduce/2,
- read_reduce/6,
+ read_reduce/7,
setup_reduce_indexes/3
]).
@@ -30,7 +30,7 @@
-define(MAX_SKIP_LIST_LEVELS, 6).
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
#{
db_prefix := DbPrefix
} = Db,
@@ -38,9 +38,16 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
#mrargs{
- limit = Limit
+ limit = Limit,
+ group = Group,
+ group_level = GroupLevel
} = Args,
+ GroupLevel1 = case Group of
+ true -> group_true;
+ _ -> GroupLevel
+ end,
+
Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
try
@@ -59,7 +66,7 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
- Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 9683265..5759c42 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,7 +15,7 @@
-export([
- fold_level0/6,
+ fold_level0/8,
create_skip_list/3,
update_reduce_idx/6
]).
@@ -26,7 +26,7 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric2.hrl").
--define(MAX_SKIP_LIST_LEVELS, 6).
+-define(MAX_SKIP_LIST_LEVELS, 1).
-define(LEVEL_FAN_POW, 1).
log_levels(Db, Sig, ViewId) ->
@@ -34,34 +34,40 @@ log_levels(Db, Sig, ViewId) ->
db_prefix := DbPrefix
} = Db,
- Levels = lists:seq(0, 6),
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Opts = [{streaming_mode, want_all}],
fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- lists:foreach(fun (Level) ->
+ lists:foldl(fun (Level, Level0Total) ->
{StartKey, EndKey} = erlfdb_tuple:range({Level},
ReduceIdxPrefix),
- Acc0 = #{
- sig => Sig,
- view_id => ViewId,
- reduce_idx_prefix => ReduceIdxPrefix,
- user_acc => [],
- callback => fun handle_log_levels/3
- },
-
- Fun = fun fold_fwd_cb/2,
- Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
- #{
- user_acc := Rows
- } = Acc,
- io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
- end, Levels)
+ Future = erlfdb:get_range(Tx, StartKey, EndKey, Opts),
+ Rows = lists:map(fun ({_Key, EV}) ->
+ unpack_key_value(EV)
+ end, erlfdb:wait(Future)),
+
+ io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+ case Level == 0 of
+ true ->
+ sum_rows(Rows);
+ false ->
+ Total = sum_rows(Rows),
+ if Total == Level0Total -> Level0Total; true ->
+ io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
+%% throw(level_total_error)
+ end
+ end
+
+ end, 0, Levels)
end).
-handle_log_levels(Key, Value, Acc) ->
- Acc ++ [{Key, Value}].
+sum_rows(Rows) ->
+ lists:foldl(fun ({_, Val}, Sum) ->
+ Val + Sum
+ end, 0, Rows).
+
%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
%% #{
@@ -95,7 +101,7 @@ handle_log_levels(Key, Value, Acc) ->
%% {ok, UserAcc1}
%% end).
-fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
#{
db_prefix := DbPrefix
} = Db,
@@ -108,7 +114,10 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
user_acc => UserAcc0,
%% args := Args,
callback => UserCallback,
- reduce_idx_prefix => ReduceIdxPrefix
+ reduce_idx_prefix => ReduceIdxPrefix,
+ reducer => Reducer,
+ group_level => GroupLevel,
+ rows => []
},
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -118,36 +127,94 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
} = TxDb,
-%% {StartKey, EndKey} = erlfdb_tuple:range({0},
-%% ReduceIdxPrefix),
{startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
{endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
-%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Fun = fun fold_fwd_cb/2,
Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
#{
- user_acc := UserAcc1
+ user_acc := UserAcc1,
+ rows := Rows
} = Acc1,
- UserAcc1
+
+ rereduce_and_reply(Reducer, Rows, GroupLevel,
+ UserCallback, UserAcc1)
end).
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
#{
- reduce_idx_prefix := ReduceIdxPrefix,
callback := Callback,
- user_acc := UserAcc
+ user_acc := UserAcc,
+ group_level := GroupLevel,
+ rows := Rows,
+ reducer := Reducer
} = Acc,
- {_Level, _EK}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- {EK, EV1} = erlfdb_tuple:unpack(EV),
- Key = couch_views_encoding:decode(EK),
- Val = couch_views_encoding:decode(EV1),
+ {Key, Val} = unpack_key_value(EV),
+
+ LastKey = if Rows == [] -> false; true ->
+ {LastKey0, _} = lists:last(Rows),
+ LastKey0
+ end,
+
+ case group_level_equal(Key, LastKey, GroupLevel) of
+ true ->
+ Acc#{
+ rows := Rows ++ [{Key, Val}]
+ };
+ false ->
+ UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
+ Callback, UserAcc),
+ Acc#{
+ user_acc := UserAcc1,
+ rows := [{Key, Val}]
+ }
+ end.
+
+rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
+ Acc;
+
+rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
+ {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+ Callback(ReducedKey, ReducedVal, Acc).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+ no_kvs;
- UserAcc1 = Callback(Key, Val, UserAcc),
- Acc#{user_acc := UserAcc1}.
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+ {Key, Val} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+ Val = length(Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+ true;
+
+group_level_equal(_One, _Two, group_true) ->
+ false;
+
+group_level_equal(One, Two, GroupLevel) ->
+ GroupOne = group_level_key(One, GroupLevel),
+ GroupTwo = group_level_key(Two, GroupLevel),
+ GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+ null;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+ lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+ Key.
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
@@ -155,6 +222,13 @@ reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
erlfdb_tuple:pack(Key, DbPrefix).
+unpack_key_value(EncodedValue) ->
+ {EK, EV1} = erlfdb_tuple:unpack(EncodedValue),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
+ {Key, Val}.
+
+
%% Inserting
update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
#{
@@ -205,15 +279,15 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
- io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+ io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
- io:format("Adding ~p ~p ~n", [Level, Key]),
+ io:format("Adding at ~p ~p ~n", [Level, Key]),
add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
false ->
- {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
- io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
@@ -229,7 +303,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
EK = couch_views_encoding:encode(Key, key),
StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
- StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+ StartKeySel = erlfdb_key:last_less_than(StartKey),
EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
@@ -241,6 +315,9 @@ hash_key(Key) ->
erlang:phash2(Key).
+should_add_key_to_level(0, _KeyHash) ->
+ true;
+
should_add_key_to_level(Level, KeyHash) ->
(KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -277,11 +354,11 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
ok = erlfdb:set(Tx, LevelKey, EV).
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
- case PrevVal >= Val of
- true -> {PrevKey, PrevVal};
- false -> {PrevKey, Val}
- end.
+%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+%% case PrevVal >= Val of
+%% true -> {PrevKey, PrevVal};
+%% false -> {PrevKey, Val}
+%% end.
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index c1b35e2..488f3ee 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -60,16 +60,16 @@ defmodule CouchViewsReduceTest do
test "group_level=1 count reduce", context do
args = %{
:reduce => true,
- :group => true,
+ :group_level => 1,
}
{:ok, res} = run_query(context, args, "dates")
IO.inspect(res, label: "OUT")
assert res == [
- {:row, [key: [2017], value: 1]},
- {:row, [key: [2018], value: 1]},
- {:row, [key: [2019], value: 1]}
+ {:row, [key: [2017], value: 4]},
+ {:row, [key: [2018], value: 3]},
+ {:row, [key: [2019], value: 2]}
]
end
@@ -221,7 +221,7 @@ defmodule CouchViewsReduceTest do
[2019, 4, 1]
]
- for i <- 1..4 do
+ for i <- 1..10 do
group =
if rem(i, 3) == 0 do
"first"
@@ -235,7 +235,6 @@ defmodule CouchViewsReduceTest do
{"some", "field"},
{"group", group},
{"date", Enum.at(dates, i - 1)}
- # {"timestamp", Enum.at(timestamps, i - 1)}
]})
end
end
@@ -254,25 +253,25 @@ defmodule CouchViewsReduceTest do
}
"""},
{"reduce", "_count"}
- ]}},
- {"baz",
- {[
- {"map",
- """
- function(doc) {
- emit(doc.value, doc.value);
- emit(doc.value, doc.value);
- emit([doc.value, 1], doc.value);
- emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
- if (doc.value === 3) {
- emit([1, 1, 5], 1);
- emit([doc.value, 1, 5], 1);
- }
- }
- """},
- {"reduce", "_count"}
]}}
+# {"baz",
+# {[
+# {"map",
+# """
+# function(doc) {
+# emit(doc.value, doc.value);
+# emit(doc.value, doc.value);
+# emit([doc.value, 1], doc.value);
+# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+# if (doc.value === 3) {
+# emit([1, 1, 5], 1);
+# emit([doc.value, 1, 5], 1);
+# }
+# }
+# """},
+# {"reduce", "_count"}
+# ]}}
# {"boom",
# {[
# {"map",
[couchdb] 02/05: printing
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e399fb336e3f2a828b7887d36c3acad9e4b2408f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Oct 28 14:34:00 2019 +0200
printing
---
src/couch_views/src/couch_views_fdb.erl | 92 +---------
src/couch_views/src/couch_views_indexer.erl | 4 +-
src/couch_views/src/couch_views_reduce.erl | 84 +++------
.../test/exunit/couch_views_reduce_test.exs | 202 ++++++++++-----------
4 files changed, 133 insertions(+), 249 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 07241dd..479a707 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,7 +20,6 @@
get_kv_size/3,
fold_map_idx/6,
- fold_reduce_idx/6,
write_doc/4
]).
@@ -183,8 +182,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
- ExistingKeys, ReduceResult),
- update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+ ExistingKeys, ReduceResult)
end, lists:zip3(ViewIds, Results, ReduceResults)).
@@ -357,53 +355,6 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
end, KVsToAdd).
-update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = TxDb,
-
-%% Unique = lists:usort([K || {K, _V} <- NewRows]),
-
-%% KeysToRem = ExistingKeys -- Unique,
-%% lists:foreach(fun(RemKey) ->
-%% {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
-%% ok = erlfdb:clear_range(Tx, Start, End)
-%% end, KeysToRem),
-%%
- {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
- ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
- add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
- add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
-
-
-add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
- lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
- KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
- ReduceType, ?VIEW_ROW_KEY),
- VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
- ReduceType, ?VIEW_ROW_VALUE),
- ok = erlfdb:set(Tx, KK, Key2),
- ok = erlfdb:add(Tx, VK, Val)
- end, KVsToAdd).
-
-
-reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
- Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
- erlfdb_tuple:pack(Key, DbPrefix).
-
-
-reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
- Key = {ReduceKey, GroupLevel, ReduceType, RowType},
- erlfdb_tuple:pack(Key, ReduceIdxPrefix).
-
-
-%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
-%% Encoded = couch_views_encoding:encode(MapKey, key),
-%% Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
-%% erlfdb_tuple:range(Key, DbPrefix).
-
-
get_view_keys(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -497,47 +448,6 @@ process_rows(Rows) ->
end, [], Grouped).
-process_reduce_rows(Rows) ->
- ReduceExact = encode_reduce_rows(Rows),
- ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
- Out = create_grouping(Key, Val, [], Groupings),
- Out
- end, #{}, Rows),
- ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
- {ReduceExact, ReduceGroups1}.
-
-
-encode_reduce_rows(Rows) ->
- lists:map(fun({K, V}) ->
- EK1 = couch_views_encoding:encode(K, key),
- EK2 = couch_views_encoding:encode(K, value),
- {EK1, EK2, V, group_level(K)}
- end, Rows).
-
-
-group_level(Key) when is_list(Key) ->
- length(Key);
-
-group_level(_Key) ->
- 1.
-
-
-create_grouping([], _Val, _, Groupings) ->
- Groupings;
-
-create_grouping([Head | Rest], Val, Key, Groupings) ->
- Key1 = Key ++ [Head],
- Groupings1 = maps:update_with(Key1, fun(OldVal) ->
- OldVal + Val
- end, Val, Groupings),
- create_grouping(Rest, Val, Key1, Groupings1);
-
-create_grouping(Key, Val, _, Groupings) ->
- maps:update_with(Key, fun(OldVal) ->
- OldVal + Val
- end, Val, Groupings).
-
-
calculate_row_size(Rows) ->
lists:foldl(fun({K, V}, Acc) ->
Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 2f38ac1..d4a78b8 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -101,8 +101,8 @@ update(#{} = Db, Mrst0, State0) ->
DocAcc1 = fetch_docs(TxDb, DocAcc),
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
- Results = run_reduce(Mrst1, MappedDocs),
- write_docs(TxDb, Mrst1, MappedDocs, State2),
+ MappedReducedDocs = run_reduce(Mrst1, MappedDocs),
+ write_docs(TxDb, Mrst1, MappedReducedDocs, State2),
case Count < Limit of
true ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 1502f38..4cb7416 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -26,7 +26,7 @@
-include_lib("fabric/include/fabric2.hrl").
--define(LEVEL_FAN_POW, 4).
+-define(LEVEL_FAN_POW, 1).
-define(MAX_SKIP_LIST_LEVELS, 6).
@@ -84,7 +84,6 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
args => Args,
callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
- next => key,
rows => []
},
@@ -111,31 +110,19 @@ args_to_fdb_opts(#mrargs{} = Args) ->
ok.
-fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
- #{
- reduce_idx_prefix := ReduceIdxPrefix
- } = Acc,
-
- {Level, EK, ?VIEW_ROW_KEY}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-
-%% Key = couch_views_encoding:decode(EV),
- Val = couch_views_encoding:decode(EV),
- Acc#{next := value, key := Val};
-
-fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
#{
reduce_idx_prefix := ReduceIdxPrefix,
- rows := Rows,
- key := Key
+ rows := Rows
} = Acc,
- {Level, EK, ?VIEW_ROW_VALUE}
+ {_Level, _EK}
= erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ {EK, EV1} = erlfdb_tuple:unpack(EV),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
-%% Key = couch_views_encoding:decode(EV),
- Val = couch_views_encoding:decode(EV),
- Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+ Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -219,8 +206,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
- add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val])
+%% add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
end, ReduceResult).
@@ -241,21 +228,9 @@ create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
end, Levels)
end).
-%% This sucks but its simple for now
-should_add_key_to_level(0, _, _) ->
- true;
-
-should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
- false;
-
-should_add_key_to_level(_, _, false) ->
- false;
-
-should_add_key_to_level(_, _Key, _Prev) ->
- crypto:rand_uniform(0, 2) == 0.
-%%should_add_key_to_level(Level, Key) ->
-%% erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+should_add_key_to_level(Level, Key) ->
+ (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -270,20 +245,18 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foldl(fun(Level, PrevCoinFlip) ->
+ lists:foldl(fun(Level) ->
io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
- case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+ case should_add_key_to_level(Level, Key) of
true ->
io:format("Adding ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
- true;
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
false ->
{PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
- false
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
end
end, true, Levels)
end).
@@ -297,13 +270,20 @@ rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
- Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
-reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
- Key = {SkipLevel, ReduceKey, RowType},
- erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+ EK = couch_views_encoding:encode(Key, key),
+ LevelKey = {SkipLevel, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+ EK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+ erlfdb_tuple:pack({EK, EV}).
add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
@@ -311,14 +291,10 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
tx := Tx
} = TxDb,
- EK = couch_views_encoding:encode(Key, key),
- EVK = couch_views_encoding:encode(Key),
- EV = couch_views_encoding:encode(Val),
+ LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+ EV = create_value(Key, Val),
- KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
- VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
- ok = erlfdb:set(Tx, KK, EVK),
- ok = erlfdb:set(Tx, VK, EV).
+ ok = erlfdb:set(Tx, LevelKey, EV).
get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 3f7a173..a526658 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -8,7 +8,7 @@ defmodule CouchViewsReduceTest do
alias Couch.Test.Setup.Step
setup_all do
- test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+ test_ctx = :test_util.start_couch([:fabric, :couch_js, :couch_views, :couch_jobs])
on_exit(fn ->
:test_util.stop_couch(test_ctx)
@@ -40,30 +40,30 @@ defmodule CouchViewsReduceTest do
}
end
- # test "group=true count reduce", context do
- # args = %{
- # :reduce => true,
- # :group => true
- # # :limit => 9
- # }
- #
- # {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
- #
- # assert res == [
- # {:row, [key: 1, value: 2]},
- # {:row, [key: 2, value: 2]},
- # {:row, [key: 3, value: 2]},
- # {:row, [key: [1, 1], value: 1]},
- # {:row, [key: [1, 1, 5], value: 1]},
- # {:row, [key: [1, 2, 6], value: 1]},
- # {:row, [key: [2, 1], value: 1]},
- # {:row, [key: [2, 3, 6], value: 1]},
- # {:row, [key: [3, 1], value: 1]},
- # {:row, [key: [3, 1, 5], value: 1]},
- # {:row, [key: [3, 4, 5], value: 1]}
- # ]
- # end
+ test "group=true count reduce", context do
+ args = %{
+ :reduce => true,
+ :group => true
+ # :limit => 9
+ }
+
+ {:ok, res} = run_query(context, args, "baz")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1, 1], value: 1]},
+ {:row, [key: [1, 1, 5], value: 1]},
+ {:row, [key: [1, 2, 6], value: 1]},
+ {:row, [key: [2, 1], value: 1]},
+ {:row, [key: [2, 3, 6], value: 1]},
+ {:row, [key: [3, 1], value: 1]},
+ {:row, [key: [3, 1, 5], value: 1]},
+ {:row, [key: [3, 4, 5], value: 1]}
+ ]
+ end
# test "group=1 count reduce", context do
# args = %{
@@ -150,20 +150,20 @@ defmodule CouchViewsReduceTest do
# ]
# end
- test "group_level=0 _max reduce", context do
- args = %{
- :reduce => true,
- :group_level => 0
- # :limit => 9
- }
-
- {:ok, res} = run_query(context, args, "max")
- IO.inspect(res, label: "OUT")
-
- assert res == [
- {:row, [key: :null, value: 3]}
- ]
- end
+ # test "group_level=0 _sum reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 0
+ # # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "max")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: :null, value: 3]}
+ # ]
+ # end
defp run_query(context, args, view) do
db = context[:db]
@@ -217,70 +217,68 @@ defmodule CouchViewsReduceTest do
end
defp create_ddoc() do
- :couch_doc.from_json_obj(
- {[
- {"_id", "_design/bar"},
- {"views",
- {[
-# {"baz",
-# {[
-# {"map",
-# """
-# function(doc) {
-# emit(doc.value, doc.value);
-# emit(doc.value, doc.value);
-# emit([doc.value, 1], doc.value);
-# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-# if (doc.value === 3) {
-# emit([1, 1, 5], 1);
-# emit([doc.value, 1, 5], 1);
-# }
-# }
-# """},
-# {"reduce", "_count"}
-# ]}},
-# {"boom",
-# {[
-# {"map",
-# """
-# function(doc) {
-# var month = 1;
-# if (doc.value % 2) {
-# month = 2;
-# }
-# emit([2019, month, doc.value], doc.value);
-# }
-# """},
-# {"reduce", "_count"}
-# ]}},
- {"max",
- {[
- {"map",
- """
- function(doc) {
- //emit(doc.value, doc.value);
- //emit([doc.value, 1], doc.value);
- //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
- emit(1, 1);
- emit(2, 2);
- emit(3, 3);
- emit(4, 4);
+ :couch_doc.from_json_obj({[
+ {"_id", "_design/bar"},
+ {"views",
+ {[
+ {"baz",
+ {[
+ {"map",
+ """
+ function(doc) {
+ emit(doc.value, doc.value);
+ emit(doc.value, doc.value);
+ emit([doc.value, 1], doc.value);
+ emit([doc.value, doc.value + 1, doc.group.length], doc.value);
- emit([2019, 2, 2], 1);
- emit([2019, 3, 3], 2);
- emit([2019, 3, 3], 3);
- emit([2019, 4, 3], 4);
- emit([2019, 5, 3], 6);
- if (doc.value === 3) {
- //emit([doc.value, 1, 5], 1);
- }
+ if (doc.value === 3) {
+ emit([1, 1, 5], 1);
+ emit([doc.value, 1, 5], 1);
}
- """},
- {"reduce", "_stats"}
- ]}}
- ]}}
- ]}
- )
+ }
+ """},
+ {"reduce", "_count"}
+ ]}}
+ # {"boom",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # var month = 1;
+ # if (doc.value % 2) {
+ # month = 2;
+ # }
+ # emit([2019, month, doc.value], doc.value);
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}},
+ # {"max",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # //emit(doc.value, doc.value);
+ # //emit([doc.value, 1], doc.value);
+ # //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ # emit(1, 1);
+ # emit(2, 2);
+ # emit(3, 3);
+ # emit(4, 4);
+ #
+ # emit([2019, 2, 2], 1);
+ # emit([2019, 3, 3], 2);
+ # emit([2019, 3, 3], 3);
+ # emit([2019, 4, 3], 4);
+ # emit([2019, 5, 3], 6);
+ # if (doc.value === 3) {
+ # //emit([doc.value, 1, 5], 1);
+ # }
+ # }
+ # """},
+ # {"reduce", "_stats"}
+ # ]}}
+ ]}}
+ ]})
end
end
[couchdb] 05/05: level 0 _sum working
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7ca52023ae5e2c697ae5258277d0aceaea2c00b1
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200
level 0 _sum working
---
src/couch_views/src/couch_views_fdb.erl | 8 +-
src/couch_views/src/couch_views_indexer.erl | 7 +-
src/couch_views/src/couch_views_reduce.erl | 12 ++
src/couch_views/src/couch_views_reduce_fdb.erl | 54 +++++++-
.../test/exunit/couch_views_reduce_test.exs | 150 +++++++++++++--------
5 files changed, 160 insertions(+), 71 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
#{
id := DocId,
results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
%% TODO: handle when there is no reduce
io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
- lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+ lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
ExistingKeys, ReduceResult)
- end, lists:zip3(ViewIds, Results, ReduceResults)).
+ end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 3c60743..f01a58b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -226,6 +226,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+ Id = View#mrview.id_num,
+ [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+ Acc ++ [{Id, ReduceFun}]
+ end, [], Views),
%% First build of the view
if ViewSeq /= <<>> -> ok; true ->
@@ -233,7 +238,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
end,
lists:foreach(fun(Doc) ->
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+ couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
end, Docs),
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
end, #{}, Results),
maps:to_list(ReduceResults);
+reduce(<<"_sum">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Sum} = Acc,
+ Acc#{Key := Val + Sum};
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
% this isn't a real supported reduce function in CouchDB
% But I want a basic reduce function that when we need to update the index
% we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
-export([
fold_level0/8,
create_skip_list/3,
- update_reduce_idx/6
+ update_reduce_idx/7
]).
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
GroupKey = group_level_key(Key, GroupLevel),
{GroupKey, Val};
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+ Sum = lists:foldl(fun ({_, Val}, Acc) ->
+ Val + Acc
+ end, 0, Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Sum};
+
rereduce(<<"_count">>, Rows, GroupLevel) ->
Val = length(Rows),
{Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
group_level_key(_Key, 0) ->
null;
+group_level_key(Key, group_true) ->
+ Key;
+
group_level_key(Key, GroupLevel) when is_list(Key) ->
lists:sublist(Key, GroupLevel);
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
#{
db_prefix := DbPrefix
} = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
ViewOpts = #{
db_prefix => DbPrefix,
sig => Sig,
- view_id => ViewId
+ view_id => ViewId,
+ reducer => Reducer
},
lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
#{
db_prefix := DbPrefix,
sig := Sig,
- view_id := ViewId
+ view_id := ViewId,
+ reducer := Reducer
} = ViewOpts,
- Levels = lists:seq(0, MaxLevel),
+ Levels = lists:seq(1, MaxLevel),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
KeyHash = hash_key(Key),
fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+ not_found ->
+ Val;
+ ExistingVal ->
+ {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+ ReducedVal
+ end,
+ io:format("VAL1 ~p ~n", [Val1]),
+ add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
io:format("Adding at ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
false ->
%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+ add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ EK = create_key(ReduceIdxPrefix, Level, Key),
+ io:format("FF ~p ~n", [Key]),
+ Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+ not_found ->
+ not_found;
+ PackedValue ->
+ io:format("HERE ~p ~n", [PackedValue]),
+ {_, Value} = get_key_value(PackedValue),
+ Value
+ end,
+ io:format("GETTING ~p ~p ~n", [Key, Out]),
+ Out.
+
get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
#{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
}
end
-# test "group=true count reduce with limit", context do
+ # test "group=true count reduce with limit", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true,
+ # :limit => 3
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "dates")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2017, 3, 1], value: 1]},
+ # {:row, [key: [2017, 4, 1], value: 1]},
+ # {:row, [key: [2017, 4, 15], value: 1]}
+ # ]
+ # end
+
+# test "group_level=1 count reduce", context do
# args = %{
# :reduce => true,
-# :group => true,
-# :limit => 3
+# :group_level => 1
# }
#
-# {:ok, res} = run_query(context, args, "dates")
+# {:ok, res} = run_query(context, args, "dates_count")
# IO.inspect(res, label: "OUT")
#
# assert res == [
-# {:row, [key: [2017, 3, 1], value: 1]},
-# {:row, [key: [2017, 4, 1], value: 1]},
-# {:row, [key: [2017, 4, 15], value: 1]}
+# {:row, [key: [2017], value: 4]},
+# {:row, [key: [2018], value: 3]},
+# {:row, [key: [2019], value: 2]}
# ]
# end
- test "group_level=1 count reduce", context do
- args = %{
- :reduce => true,
- :group_level => 1,
- }
+ test "group_level=1 reduce reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 1
+ }
- {:ok, res} = run_query(context, args, "dates")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: [2017], value: 4]},
- {:row, [key: [2018], value: 3]},
- {:row, [key: [2019], value: 2]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 31]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 17]}
+ ]
end
# test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
defp create_docs() do
dates = [
- [2017, 3, 1],
- [2017, 4, 1],
+ {[2017, 3, 1], 9},
+ {[2017, 4, 1], 7},
# out of order check
- [2019, 3, 1],
- [2017, 4, 15],
- [2018, 4, 1],
- [2017, 5, 1],
- [2018, 3, 1],
+ {[2019, 3, 1], 4},
+ {[2017, 4, 15], 6},
+ {[2018, 4, 1], 3},
+ {[2017, 5, 1], 9},
+ {[2018, 3, 1], 6},
# duplicate check
- [2018, 4, 1],
- [2018, 5, 1],
- [2019, 4, 1]
+ {[2018, 4, 1], 4},
+ {[2018, 5, 1], 7},
+ {[2019, 4, 1], 6},
+ {[2019, 5, 1], 7}
]
- for i <- 1..10 do
+ for i <- 1..11 do
group =
if rem(i, 3) == 0 do
"first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj({[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group},
- {"date", Enum.at(dates, i - 1)}
- ]})
+ {date_key, date_val} = Enum.at(dates, i - 1)
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", date_key},
+ {"date_val", date_val}
+ ]}
+ )
end
end
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
- {"dates",
+ # {"dates_count",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.date, doc.value);
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
+ {"dates_sum",
{[
{"map",
"""
function(doc) {
- emit(doc.date, doc.value);
- }
+ emit(doc.date, doc.date_val);
+ }
"""},
- {"reduce", "_count"}
+ {"reduce", "_sum"}
]}}
-# {"baz",
-# {[
-# {"map",
-# """
-# function(doc) {
-# emit(doc.value, doc.value);
-# emit(doc.value, doc.value);
-# emit([doc.value, 1], doc.value);
-# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-# if (doc.value === 3) {
-# emit([1, 1, 5], 1);
-# emit([doc.value, 1, 5], 1);
-# }
-# }
-# """},
-# {"reduce", "_count"}
-# ]}}
+ # {"baz",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.value, doc.value);
+ # emit(doc.value, doc.value);
+ # emit([doc.value, 1], doc.value);
+ # emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ #
+ # if (doc.value === 3) {
+ # emit([1, 1, 5], 1);
+ # emit([doc.value, 1, 5], 1);
+ # }
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
# {"boom",
# {[
# {"map",