You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/12/05 17:03:38 UTC
[couchdb] branch prototype/builtin-reduce updated (7ca5202 ->
da6fa09)
This is an automated email from the ASF dual-hosted git repository.
garren pushed a change to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
discard 7ca5202 level 0 _sum working
discard bff44fb can do group_level query
discard 533fdb6 progress with reading level 0
discard e399fb3 printing
discard 03cbc41 Initial work
add 987efb3 add test to prove we can view swap
add 8d5c107 Use "\xFF/metadataVersion" key for checking metadata
add 8bb0718 Abandon a view job if the db or ddoc is deleted
add 583d7fe Pass contexts to fabric2_db functions
add 8d28d85 Merge pull request #2279 from cloudant/refactor-user-ctx-handling
add 3db0ba7 Ensure we can create partitioned design docs with FDB
add aaae564 Check security properties in the main transaction
add b71cbe2 Before starting a db transanction, refresh the db handle from the cache
add 44f660f Update fabric2_fdb's set_config to take un-encoding values
add b58dc30 Assert Db handle field existence in `load_config/1` in fabric2_fdb
add 706acca Check membership when calling get_security/1 in fabric2_db
add 2d3737c Support regexp based blacklist in config
add c9b8e25 Implement fabric2_server:fdb_cluster/0
add e93d1b4 Add ctrace application
add 98bc5ea Trace http endpoints
add 4680884 Trace fdb transactions
add 5e47f50 Implement node types
add be22ef9 Add operation names for all HTTP endpoints
add 3c2b92c Change end-point /_up to check fdb connectivity
add 0cea6a4 Optimize view read latency when the view ready
add 2247c80 Retry for failed indexes builds
new 5736d57 Initial work
new 88ed4eb printing
new 0076f15 progress with reading level 0
new 0ddb800 can do group_level query
new 6f6bedc level 0 _sum working
new 457cb7e startkey/endkey with level0
new f01b653 basic skiplist query working
new da6fa09 basic weird reduce view support
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (7ca5202)
\
N -- N -- N refs/heads/prototype/builtin-reduce (da6fa09)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.gitignore | 5 +
rebar.config.script | 5 +
rel/overlay/etc/default.ini | 50 ++-
src/chttpd/src/chttpd.app.src | 1 +
src/chttpd/src/chttpd.erl | 118 +++++-
src/chttpd/src/chttpd_app.erl | 4 +-
src/chttpd/src/chttpd_auth_request.erl | 5 +-
src/chttpd/src/chttpd_db.erl | 103 ++---
src/chttpd/src/chttpd_handlers.erl | 23 +-
src/chttpd/src/chttpd_httpd_handlers.erl | 442 ++++++++++++++++++++-
src/chttpd/src/chttpd_misc.erl | 11 +-
src/chttpd/src/chttpd_sup.erl | 16 +-
src/couch/src/couch_util.erl | 37 +-
src/couch_jobs/src/couch_jobs.hrl | 5 +-
src/couch_mrview/src/couch_mrview.erl | 14 +-
src/couch_views/src/couch_views.erl | 60 +--
src/couch_views/src/couch_views_app.erl | 4 +-
src/couch_views/src/couch_views_indexer.erl | 99 ++++-
src/couch_views/src/couch_views_jobs.erl | 3 +-
src/couch_views/src/couch_views_reduce.erl | 148 +++----
src/couch_views/src/couch_views_reduce_fdb.erl | 403 +++++++++++++------
src/couch_views/src/couch_views_reducer.erl | 119 ++++++
src/couch_views/src/couch_views_sup.erl | 32 +-
.../test/exunit/couch_views_reduce_test.exs | 236 +++++------
src/ctrace/README.md | 291 ++++++++++++++
src/{fabric => ctrace}/rebar.config | 0
.../couch_js.app.src => ctrace/src/ctrace.app.src} | 16 +-
src/ctrace/src/ctrace.erl | 361 +++++++++++++++++
.../eunit.config => src/ctrace/src/ctrace.hrl | 8 +-
.../couch_js_app.erl => ctrace/src/ctrace_app.erl} | 11 +-
src/ctrace/src/ctrace_config.erl | 133 +++++++
src/ctrace/src/ctrace_dsl.erl | 106 +++++
.../couch_js_sup.erl => ctrace/src/ctrace_sup.erl} | 26 +-
src/ctrace/test/ctrace_config_test.erl | 153 +++++++
src/ctrace/test/ctrace_dsl_test.erl | 123 ++++++
src/ctrace/test/ctrace_test.erl | 412 +++++++++++++++++++
src/fabric/include/fabric2.hrl | 13 +-
src/fabric/src/fabric.app.src | 1 +
src/fabric/src/fabric2_db.erl | 68 ++--
src/fabric/src/fabric2_fdb.erl | 159 +++++---
src/fabric/src/fabric2_node_types.erl | 52 +++
src/fabric/src/fabric2_server.erl | 38 +-
src/fabric/test/fabric2_db_security_tests.erl | 141 +++----
src/fabric/test/fabric2_dir_prefix_tests.erl | 2 +-
src/fabric/test/fabric2_doc_crud_tests.erl | 18 +
src/fabric/test/fabric2_node_types_tests.erl | 73 ++++
.../src/global_changes_httpd_handlers.erl | 8 +-
src/mango/src/mango_httpd_handlers.erl | 31 +-
src/mem3/src/mem3_httpd_handlers.erl | 38 +-
src/setup/src/setup_httpd_handlers.erl | 12 +-
test/elixir/test/basics_test.exs | 6 +
test/elixir/test/map_test.exs | 99 +++++
52 files changed, 3619 insertions(+), 723 deletions(-)
create mode 100644 src/couch_views/src/couch_views_reducer.erl
create mode 100644 src/ctrace/README.md
copy src/{fabric => ctrace}/rebar.config (100%)
copy src/{couch_js/src/couch_js.app.src => ctrace/src/ctrace.app.src} (78%)
create mode 100644 src/ctrace/src/ctrace.erl
copy rel/files/eunit.config => src/ctrace/src/ctrace.hrl (80%)
copy src/{couch_js/src/couch_js_app.erl => ctrace/src/ctrace_app.erl} (90%)
create mode 100644 src/ctrace/src/ctrace_config.erl
create mode 100644 src/ctrace/src/ctrace_dsl.erl
copy src/{couch_js/src/couch_js_sup.erl => ctrace/src/ctrace_sup.erl} (71%)
create mode 100644 src/ctrace/test/ctrace_config_test.erl
create mode 100644 src/ctrace/test/ctrace_dsl_test.erl
create mode 100644 src/ctrace/test/ctrace_test.erl
create mode 100644 src/fabric/src/fabric2_node_types.erl
create mode 100644 src/fabric/test/fabric2_node_types_tests.erl
[couchdb] 08/08: basic weird reduce view support
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit da6fa0925354d8ca2d95c0bfbcdd3660eb2f1244
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Dec 5 19:02:21 2019 +0200
basic weird reduce view support
---
src/couch_views/src/couch_views_reduce.erl | 20 +-
src/couch_views/src/couch_views_reduce_fdb.erl | 3 +-
src/couch_views/src/couch_views_reducer.erl | 10 +-
.../test/exunit/couch_views_reduce_test.exs | 237 +++++++++------------
4 files changed, 112 insertions(+), 158 deletions(-)
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 0e837e3..4eae5b9 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -25,8 +25,8 @@
-include_lib("fabric/include/fabric2.hrl").
--define(LEVEL_FAN_POW, 1).
--define(MAX_SKIP_LIST_LEVELS, 6).
+%%-define(LEVEL_FAN_POW, 1).
+%%-define(MAX_SKIP_LIST_LEVELS, 6).
read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
@@ -34,7 +34,6 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
db_prefix := DbPrefix
} = Db,
-%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = couch_views_reduce_fdb:reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
#mrargs{
limit = Limit,
@@ -47,11 +46,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
_ -> GroupLevel
end,
-%% Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+ Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
- %% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
Acc0 = #{
sig => Sig,
@@ -65,10 +63,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
-%% Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
- SkipListOpts = args_to_skiplist_opts(Args),
- Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
+%% SkipListOpts = args_to_skiplist_opts(Args),
+%% Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
@@ -111,10 +109,10 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
StartKey1 = case StartKey of
undefined ->
- StartKey0 = couch_views_reduce_fdb:create_key(0, 0, ReduceIdxPrefix),
+ StartKey0 = couch_views_reduce_fdb:create_key(ReduceIdxPrefix, 0, 0),
erlfdb_key:first_greater_than(StartKey0);
StartKey ->
- StartKey0 = couch_views_reduce_fdb:create_key(StartKey, 0, ReduceIdxPrefix),
+ StartKey0 = couch_views_reduce_fdb:create_key(ReduceIdxPrefix, 0, StartKey),
erlfdb_key:first_greater_or_equal(StartKey0)
end,
@@ -125,7 +123,7 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
EndKey0;
EndKey ->
io:format("ENDKEY ~n"),
- EndKey0 = couch_views_reduce_fdb:create_key(EndKey, 0, ReduceIdxPrefix),
+ EndKey0 = couch_views_reduce_fdb:create_key(ReduceIdxPrefix, 0, EndKey),
erlfdb_key:first_greater_than(EndKey0)
end,
[{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 72e05b8..be2ed4f 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -350,7 +350,8 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
GroupLevelKey = couch_views_reducer:group_level_key(Key, GroupLevel),
GroupKV = [{GroupLevelKey, Val}],
- case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
+%% case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
+ case GroupLevelKey == LastKey of
true ->
Acc#{
rows := Rows ++ GroupKV
diff --git a/src/couch_views/src/couch_views_reducer.erl b/src/couch_views/src/couch_views_reducer.erl
index a7ac783..7a24661 100644
--- a/src/couch_views/src/couch_views_reducer.erl
+++ b/src/couch_views/src/couch_views_reducer.erl
@@ -46,7 +46,6 @@ reduce(<<"_sum">>, Results) ->
end, #{}, Results),
maps:to_list(ReduceResults);
-
% this isn't a real supported reduce function in CouchDB
% But I want a basic reduce function that when we need to update the index
% we would need to re-read multiple rows instead of being able to do an
@@ -87,10 +86,11 @@ rereduce(<<"_sum">>, Rows, GroupLevel) ->
{GroupKey, Sum};
rereduce(<<"_count">>, Rows, GroupLevel) ->
- Val = length(Rows),
- {Key, _} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Val}.
+ rereduce(<<"_sum">>, Rows, GroupLevel).
+%% Val = length(Rows),
+%% {Key, _} = hd(Rows),
+%% GroupKey = group_level_key(Key, GroupLevel),
+%% {GroupKey, Val}.
group_level_equal(_One, _Two, 0) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index e2b9d3b..49334fd 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -73,7 +73,7 @@ defmodule CouchViewsReduceTest do
args = %{
reduce: true,
group_level: 1,
- end_key: [2019,5,1]
+ end_key: [2019, 5, 1]
}
{:ok, res} = run_query(context, args, "dates_sum")
@@ -86,91 +86,92 @@ defmodule CouchViewsReduceTest do
end
test "group_level=1 reduce with startkey/endkey", context do
- args = %{
- reduce: true,
- group_level: 1,
- start_key: [2017, 4, 1],
- end_key: [2018, 3, 1],
-
- }
+ args = %{
+ reduce: true,
+ group_level: 1,
+ start_key: [2017, 4, 1],
+ end_key: [2018, 3, 1]
+ }
- {:ok, res} = run_query(context, args, "dates_sum")
+ {:ok, res} = run_query(context, args, "dates_sum")
- assert res == [
- {:row, [key: [2017], value: 22]},
- {:row, [key: [2018], value: 6]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 22]},
+ {:row, [key: [2018], value: 6]}
+ ]
end
test "group_level=1 reduce with startkey/endkey take 2", context do
- args = %{
- reduce: true,
- group_level: 1,
- start_key: [2017, 4, 1],
- end_key: [2019, 3, 2],
- }
-
- {:ok, res} = run_query(context, args, "dates_sum")
-
- assert res == [
- {:row, [key: [2017], value: 22]},
- {:row, [key: [2018], value: 20]},
- {:row, [key: [2019], value: 4]}
- ]
+ args = %{
+ reduce: true,
+ group_level: 1,
+ start_key: [2017, 4, 1],
+ end_key: [2019, 3, 2]
+ }
+
+ {:ok, res} = run_query(context, args, "dates_sum")
+
+ assert res == [
+ {:row, [key: [2017], value: 22]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 4]}
+ ]
end
test "group_level=1 reduce with startkey/endkey take 3", context do
- args = %{
- reduce: true,
- group_level: 1,
- start_key: [2017, 4, 1],
- end_key: [2019, 05, 1],
- }
-
- {:ok, res} = run_query(context, args, "dates_sum")
-
- assert res == [
- {:row, [key: [2017], value: 22]},
- {:row, [key: [2018], value: 20]},
- {:row, [key: [2019], value: 17]}
- ]
+ args = %{
+ reduce: true,
+ group_level: 1,
+ start_key: [2017, 4, 1],
+ end_key: [2019, 05, 1]
+ }
+
+ {:ok, res} = run_query(context, args, "dates_sum")
+
+ assert res == [
+ {:row, [key: [2017], value: 22]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 17]}
+ ]
end
-# test "group=true reduce with startkey/endkey", context do
-# args = %{
-# reduce: true,
-# group: true,
-# start_key: [2018, 5, 1],
-# end_key: [2019, 04, 1],
-# }
-#
-# {:ok, res} = run_query(context, args, "dates_sum")
-#
-# assert res == [
-# {:row, [key: [2018, 5, 1], value: 7]},
-# {:row, [key: [2019, 3, 1], value: 4]},
-# {:row, [key: [2019, 4, 1], value: 6]}
-# ]
-# end
-
- # test "group=1 count reduce", context do
- # args = %{
- # :reduce => true,
- # :group_level => 1
- # # :limit => 6
- # }
+ # test "group=true reduce with startkey/endkey", context do
+ # args = %{
+ # reduce: true,
+ # group: true,
+ # start_key: [2018, 5, 1],
+ # end_key: [2019, 04, 1],
+ # }
#
- # {:ok, res} = run_query(context, args, "baz")
+ # {:ok, res} = run_query(context, args, "dates_sum")
#
- # assert res == [
- # {:row, [key: 1, value: 2]},
- # {:row, [key: 2, value: 2]},
- # {:row, [key: 3, value: 2]},
- # {:row, [key: [1], value: 2]},
- # {:row, [key: [2], value: 2]},
- # {:row, [key: [3], value: 2]}
- # ]
+ # assert res == [
+ # {:row, [key: [2018, 5, 1], value: 7]},
+ # {:row, [key: [2019, 3, 1], value: 4]},
+ # {:row, [key: [2019, 4, 1], value: 6]}
+ # ]
# end
+
+ test "group_level=1 _count reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 1,
+ :limit => 6
+ }
+
+ {:ok, res} = run_query(context, args, "baz")
+
+ IO.inspect res, label: "OUT", charlists: :as_lists
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1], value: 2]},
+ {:row, [key: [2], value: 2]},
+ {:row, [key: [3], value: 2]}
+ ]
+ end
+
#
# test "group=2 count reduce", context do
# args = %{
@@ -256,7 +257,7 @@ defmodule CouchViewsReduceTest do
end
def default_cb(:complete, acc) do
- IO.inspect(Enum.reverse(acc), label: "complete")
+ IO.inspect(Enum.reverse(acc), label: "complete", charlists: :as_lists)
{:ok, Enum.reverse(acc)}
end
@@ -325,16 +326,6 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
- # {"dates_count",
- # {[
- # {"map",
- # """
- # function(doc) {
- # emit(doc.date, doc.value);
- # }
- # """},
- # {"reduce", "_count"}
- # ]}}
{"dates_sum",
{[
{"map",
@@ -344,64 +335,28 @@ defmodule CouchViewsReduceTest do
}
"""},
{"reduce", "_sum"}
+ ]}},
+ {"baz",
+ {[
+ {"map",
+ """
+ function(doc) {
+ if (doc.value > 3) {
+ return;
+ }
+ emit(doc.value, doc.value);
+ emit(doc.value, doc.value);
+ emit([doc.value, 1], doc.value);
+ emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+
+ if (doc.value === 3) {
+ // emit([1, 1, 5], 1);
+ // emit([doc.value, 1, 5], 1);
+ }
+ }
+ """},
+ {"reduce", "_count"}
]}}
- # {"baz",
- # {[
- # {"map",
- # """
- # function(doc) {
- # emit(doc.value, doc.value);
- # emit(doc.value, doc.value);
- # emit([doc.value, 1], doc.value);
- # emit([doc.value, doc.value + 1, doc.group.length], doc.value);
- #
- # if (doc.value === 3) {
- # emit([1, 1, 5], 1);
- # emit([doc.value, 1, 5], 1);
- # }
- # }
- # """},
- # {"reduce", "_count"}
- # ]}}
- # {"boom",
- # {[
- # {"map",
- # """
- # function(doc) {
- # var month = 1;
- # if (doc.value % 2) {
- # month = 2;
- # }
- # emit([2019, month, doc.value], doc.value);
- # }
- # """},
- # {"reduce", "_count"}
- # ]}},
- # {"max",
- # {[
- # {"map",
- # """
- # function(doc) {
- # //emit(doc.value, doc.value);
- # //emit([doc.value, 1], doc.value);
- # //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
- # emit(1, 1);
- # emit(2, 2);
- # emit(3, 3);
- # emit(4, 4);
- #
- # emit([2019, 2, 2], 1);
- # emit([2019, 3, 3], 2);
- # emit([2019, 3, 3], 3);
- # emit([2019, 4, 3], 4);
- # emit([2019, 5, 3], 6);
- # if (doc.value === 3) {
- # //emit([doc.value, 1, 5], 1);
- # }
- # }
- # """},
- # {"reduce", "_stats"}
- # ]}}
]}}
]})
end
[couchdb] 01/08: Initial work
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5736d5752c3d7e2b6eef2cdf8480f4bf03462482
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200
Initial work
---
src/couch_views/include/couch_views.hrl | 1 +
src/couch_views/src/couch_views.erl | 14 +-
.../src/{couch_views.erl => couch_views.erl.orig} | 21 +-
src/couch_views/src/couch_views_fdb.erl | 116 ++++++-
src/couch_views/src/couch_views_indexer.erl | 5 +
src/couch_views/src/couch_views_reader.erl | 155 ++++++++-
src/couch_views/src/couch_views_reduce.erl | 364 +++++++++++++++++++++
.../couch_views_reduce_fdb.erl} | 29 +-
.../test/exunit/couch_views_reduce_test.exs | 286 ++++++++++++++++
src/couch_views/test/exunit/test_helper.exs | 2 +
10 files changed, 967 insertions(+), 26 deletions(-)
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..e97d777 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
-define(VIEW_ID_INFO, 1).
-define(VIEW_ID_RANGE, 2).
-define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
-define(VIEW_ROW_COUNT, 0).
-define(VIEW_KV_SIZE, 1).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 322415b..5b2c76f 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
Args1 = to_mrargs(Args0),
Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
Args3 = couch_mrview_util:validate_args(Args2),
+
ok = check_range(Args3),
- case is_reduce_view(Args3) of
- true -> throw({not_implemented});
- false -> ok
- end,
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
ok = maybe_update_view(TxDb, Mrst, Args3),
- read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
+ case is_reduce_view(Args3) of
+ true ->
+ couch_views_reader:read_reduce(Db, Mrst, ViewName,
+ Callback, Acc0, Args3);
+ false ->
+ couch_views_reader:read(Db, Mrst, ViewName,
+ Callback, Acc0, Args3)
+ end
end)
catch throw:{build_view, WaitSeq} ->
couch_views_jobs:build_view(Db, Mrst, WaitSeq),
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl.orig
similarity index 88%
copy from src/couch_views/src/couch_views.erl
copy to src/couch_views/src/couch_views.erl.orig
index 322415b..1830076 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl.orig
@@ -38,13 +38,11 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
Args1 = to_mrargs(Args0),
Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
Args3 = couch_mrview_util:validate_args(Args2),
+
ok = check_range(Args3),
- case is_reduce_view(Args3) of
- true -> throw({not_implemented});
- false -> ok
- end,
try
+<<<<<<< HEAD
fabric2_fdb:transactional(Db, fun(TxDb) ->
ok = maybe_update_view(TxDb, Mrst, Args3),
read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
@@ -52,6 +50,21 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
catch throw:{build_view, WaitSeq} ->
couch_views_jobs:build_view(Db, Mrst, WaitSeq),
read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
+=======
+ case is_reduce_view(Args3) of
+ true ->
+ couch_views_reader:read_reduce(Db, Mrst, ViewName,
+ Callback, Acc0, Args3);
+ false ->
+ couch_views_reader:read(Db, Mrst, ViewName,
+ Callback, Acc0, Args3)
+ end
+ after
+ UpdateAfter = Args3#mrargs.update == lazy,
+ if UpdateAfter == false -> ok; true ->
+ couch_views_jobs:build_view_async(Db, Mrst)
+ end
+>>>>>>> Initial work
end.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..07241dd 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
get_kv_size/3,
fold_map_idx/6,
+ fold_reduce_idx/6,
write_doc/4
]).
@@ -42,6 +43,15 @@
% View Build Sequence Access
% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+% = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+% = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
get_update_seq(TxDb, #mrst{sig = Sig}) ->
#{
@@ -124,6 +134,8 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
+
+
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -134,6 +146,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
clear_id_idx(TxDb, Sig, DocId),
lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+ %clear_reduce_idx
update_row_count(TxDb, Sig, ViewId, -TotalKeys),
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
@@ -141,14 +154,17 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
write_doc(TxDb, Sig, ViewIds, Doc) ->
#{
id := DocId,
- results := Results
+ results := Results,
+ reduce_results := ReduceResults
} = Doc,
ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
clear_id_idx(TxDb, Sig, DocId),
- lists:foreach(fun({ViewId, NewRows}) ->
+ %% TODO: handle when there is no reduce
+ io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
+ lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +181,11 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
update_kv_size(TxDb, Sig, ViewId, SizeChange),
[]
end,
- update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
- end, lists:zip(ViewIds, Results)).
+ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+ couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ ExistingKeys, ReduceResult),
+ update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+ end, lists:zip3(ViewIds, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
@@ -338,6 +357,53 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
end, KVsToAdd).
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+%% Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%% KeysToRem = ExistingKeys -- Unique,
+%% lists:foreach(fun(RemKey) ->
+%% {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%% ok = erlfdb:clear_range(Tx, Start, End)
+%% end, KeysToRem),
+%%
+ {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+ ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+ add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+ lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+ KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+ ReduceType, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+ ReduceType, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, Key2),
+ ok = erlfdb:add(Tx, VK, Val)
+ end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+ Key = {ReduceKey, GroupLevel, ReduceType, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%% Encoded = couch_views_encoding:encode(MapKey, key),
+%% Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%% erlfdb_tuple:range(Key, DbPrefix).
+
+
get_view_keys(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -394,7 +460,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
erlfdb_tuple:range(Key, DbPrefix).
-
map_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +497,47 @@ process_rows(Rows) ->
end, [], Grouped).
+process_reduce_rows(Rows) ->
+ ReduceExact = encode_reduce_rows(Rows),
+ ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+ Out = create_grouping(Key, Val, [], Groupings),
+ Out
+ end, #{}, Rows),
+ ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+ {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+ lists:map(fun({K, V}) ->
+ EK1 = couch_views_encoding:encode(K, key),
+ EK2 = couch_views_encoding:encode(K, value),
+ {EK1, EK2, V, group_level(K)}
+ end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+ length(Key);
+
+group_level(_Key) ->
+ 1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+ Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+ Key1 = Key ++ [Head],
+ Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings),
+ create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+ maps:update_with(Key, fun(OldVal) ->
+ OldVal + Val
+ end, Val, Groupings).
+
+
calculate_row_size(Rows) ->
lists:foldl(fun({K, V}, Acc) ->
Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 75e4b36..d440839 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -178,6 +178,7 @@ update(#{} = Db, Mrst0, State0) ->
DocAcc1 = fetch_docs(TxDb, DocAcc),
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
+ Results = run_reduce(Mrst1, MappedDocs),
write_docs(TxDb, Mrst1, MappedDocs, State2),
case Count < Limit of
@@ -286,6 +287,10 @@ map_docs(Mrst, Docs) ->
{Mrst1, MappedDocs}.
+run_reduce(Mrst, MappedResults) ->
+ couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
write_docs(TxDb, Mrst, Docs, State) ->
#mrst{
views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..d08515c 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
-module(couch_views_reader).
-export([
+ read_reduce/6,
read/6
]).
@@ -23,6 +24,128 @@
-include_lib("fabric/include/fabric2.hrl").
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+ #mrst{
+ language = Lang,
+ sig = Sig,
+ views = Views
+ } = Mrst,
+
+ ViewId = get_view_id(Lang, Args, ViewName, Views),
+ couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+ UserAcc0, Args).
+%% Fun = fun handle_reduce_row/3,
+%%
+%% try
+%% fabric2_fdb:transactional(Db, fun(TxDb) ->
+%% Meta = get_meta(TxDb, Mrst, ViewId, Args),
+%% UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+%%
+%% #mrargs{
+%% limit = Limit
+%% } = Args,
+%%
+%% Acc0 = #{
+%% db => TxDb,
+%% skip => Args#mrargs.skip,
+%% mrargs => undefined,
+%% callback => UserCallback,
+%% acc => UserAcc1,
+%% row_count => 0,
+%% limit => Limit
+%% },
+%%
+%% Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+%% Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+%% KeyAcc1 = KeyAcc0#{
+%% mrargs := KeyArgs
+%% },
+%% couch_views_fdb:fold_reduce_idx(
+%% TxDb,
+%% Sig,
+%% ViewId,
+%% Opts,
+%% Fun,
+%% KeyAcc1
+%% )
+%% end, Acc0, expand_keys_args(Args)),
+%%
+%% #{
+%% acc := UserAcc2
+%% } = Acc1,
+%% {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+%% end)
+%% catch throw:{done, Out} ->
+%% {ok, Out}
+%% end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+ #mrargs{
+%% start_key = StartKey0,
+%% start_key_docid = StartKeyDocId,
+%% end_key = EndKey0,
+%% end_key_docid = EndKeyDocId,
+ direction = Direction,
+ limit = Limit,
+ skip = Skip,
+ group_level = GroupLevel,
+ group = Group
+%% inclusive_end = InclusiveEnd
+ } = Args,
+
+ GroupExact = Group == true andalso GroupLevel == 0,
+
+ GroupLevelEnd = case GroupExact of
+ true -> [];
+ false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+ end,
+
+%% StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(StartKey0, key)
+%% end,
+%%
+%% StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%% {undefined, _} ->
+%% [];
+%% {StartKey1, StartKeyDocId} ->
+%% [{start_key, {StartKey1, StartKeyDocId}}]
+%% end,
+%%
+%% EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%% couch_views_encoding:encode(EndKey0, key)
+%% end,
+%%
+%% EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%% {undefined, _, _} ->
+%% [];
+%% {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%% % When we iterate in reverse with
+%% % inclusive_end=false we have to set the
+%% % EndKeyDocId to <<255>> so that we don't
+%% % include matching rows.
+%% [{end_key_gt, {EndKey1, <<255>>}}];
+%% {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%% % When inclusive_end=false we need to
+%% % elide the default end_key_docid so as
+%% % to not sort past the docids with the
+%% % given end key.
+%% [{end_key_gt, {EndKey1}}];
+%% {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%% [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%% {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%% [{end_key, {EndKey1, EndKeyDocId}}]
+%% end,
+
+ [
+ {dir, Direction},
+%% {limit, Limit * 2 + Skip * 2},
+ {streaming_mode, large}
+%% {streaming_mode, want_all}
+ ] ++ GroupLevelEnd.
+%% ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
#mrst{
language = Lang,
@@ -113,11 +236,41 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+ io:format("ACC ~p ~n", [Acc]),
+ #{
+ callback := UserCallback,
+ acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
+ } = Acc,
+
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
+
+ RowCountNext = RowCount + 1,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
+
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
{map, View, _Args} -> View#mrview.id_num;
- {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+ {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..1502f38
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,364 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+ run_reduce/2,
+ update_reduce_idx/6,
+ read_reduce/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+-define(LEVEL_FAN_POW, 4).
+-define(MAX_SKIP_LIST_LEVELS, 6).
+
+
+log_levels(Db, Sig, ViewId) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Opts = [{streaming_mode, want_all}],
+
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ lists:foreach(fun (Level) ->
+ {StartKey, EndKey} = erlfdb_tuple:range({Level},
+ ReduceIdxPrefix),
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ next => key,
+ key => undefined,
+ rows => []
+ },
+
+ Fun = fun fold_fwd_cb/2,
+ Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ rows := Rows
+ } = Acc,
+ io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+ {ok, Rows}
+ end, Levels),
+ {ok, []}
+ end).
+
+
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ args => Args,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ next => key,
+ rows => []
+ },
+
+
+%% Opts = [{limit, 2}, {streaming_mode, want_all}],
+%% EK = couch_views_encoding:encode(0, key),
+%% {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
+%% ReduceIdxPrefix),
+%%
+%% Fun = fun fold_fwd_cb/2,
+%% Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ rows := Rows
+ } = Acc0,
+ {ok, Rows}
+ end).
+
+args_to_fdb_opts(#mrargs{} = Args) ->
+ #mrargs{
+ limit = Limit,
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+ ok.
+
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix
+ } = Acc,
+
+ {Level, EK, ?VIEW_ROW_KEY}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%% Key = couch_views_encoding:decode(EV),
+ Val = couch_views_encoding:decode(EV),
+ Acc#{next := value, key := Val};
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix,
+ rows := Rows,
+ key := Key
+ } = Acc,
+
+ {Level, EK, ?VIEW_ROW_VALUE}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%% Key = couch_views_encoding:decode(EV),
+ Val = couch_views_encoding:decode(EV),
+ Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+ ReduceFuns = lists:map(fun(View) ->
+ #mrview{
+ id_num = Id,
+ reduce_funs = ViewReduceFuns
+ } = View,
+
+ [{_, Fun}] = ViewReduceFuns,
+ Fun
+ end, Views),
+
+ lists:map(fun (MappedResult) ->
+ #{
+ results := Results
+ } = MappedResult,
+
+ ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+ reduce(ReduceFun, Result)
+ end, lists:zip(ReduceFuns, Results)),
+
+ MappedResult#{
+ reduce_results => ReduceResults
+ }
+ end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Val} = Acc,
+ Acc#{Key := Val + 1};
+ false ->
+ Acc#{Key => 1}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ io:format("MAX ~p ~p ~n", [Key, Val]),
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Max} = Acc,
+ case Max >= Val of
+ true ->
+ Acc;
+ false ->
+ Acc#{Key := Val}
+ end;
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+ true;
+
+is_builtin(_) ->
+ false.
+
+
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+ lists:foreach(fun(Level) ->
+ add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+ end, Levels)
+ end).
+
+%% This sucks but its simple for now
+should_add_key_to_level(0, _, _) ->
+ true;
+
+should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
+ false;
+
+should_add_key_to_level(_, _, false) ->
+ false;
+
+should_add_key_to_level(_, _Key, _Prev) ->
+ crypto:rand_uniform(0, 2) == 0.
+
+%%should_add_key_to_level(Level, Key) ->
+%% erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foldl(fun(Level, PrevCoinFlip) ->
+ io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
+ {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+ io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
+ case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+ true ->
+ io:format("Adding ~p ~p ~n", [Level, Key]),
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
+ true;
+ false ->
+ {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
+ false
+ end
+ end, true, Levels)
+ end).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+ case PrevVal >= Val of
+ true -> {PrevKey, PrevVal};
+ false -> {PrevKey, Val}
+ end.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
+ Key = {SkipLevel, ReduceKey, RowType},
+ erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ EK = couch_views_encoding:encode(Key, key),
+ EVK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+
+ KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
+ VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
+ ok = erlfdb:set(Tx, KK, EVK),
+ ok = erlfdb:set(Tx, VK, EV).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ % TODO: see if we need to add in conflict ranges for this for level=0
+ Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
+%% LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
+
+ EK = couch_views_encoding:encode(Key, key),
+ EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+
+ {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
+%% EndKey1 = erlfdb_key:first_greater_than(EndKey0),
+
+ Callback = fun row_cb/2,
+ Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
+ io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
+ Out.
+
+
+row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
+ io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+ {_Level, EK, _VIEW_ROW_VALUE}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ Val = couch_views_encoding:decode(EV),
+%% io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
+
+ {key, {EK, ReduceIdxPrefix, Val}};
+
+row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
+ io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+ {_Level, EK, ?VIEW_ROW_KEY}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ Key = couch_views_encoding:decode(EVK),
+
+ {Key, Val}.
+
+
+
+
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_reduce_fdb.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_reduce_fdb.erl
index 2e443eb..bcaaa30 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -10,17 +10,24 @@
% License for the specific language governing permissions and limitations under
% the License.
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_reduce_fdb).
+
+
+-export([
+%% write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%% #{
+%% id := DocId,
+%% reduce_results := ReduceResults
+%% } = Doc,
+%% lists:foreach(fun({ViewId, NewRows}) ->
+%% % update reduce index
+%% ok
+%% end, lists:zip(ViewIds, ReduceResults)).
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..3f7a173
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,286 @@
+defmodule CouchViewsReduceTest do
+ use Couch.Test.ExUnit.Case
+
+ alias Couch.Test.Utils
+
+ alias Couch.Test.Setup
+
+ alias Couch.Test.Setup.Step
+
+ setup_all do
+ test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+ on_exit(fn ->
+ :test_util.stop_couch(test_ctx)
+ end)
+ end
+
+ setup do
+ db_name = Utils.random_name("db")
+
+ admin_ctx =
+ {:user_ctx,
+ Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+ {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+ docs = create_docs()
+ ddoc = create_ddoc()
+
+ {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+ on_exit(fn ->
+ :fabric2_db.delete(db_name, [admin_ctx])
+ end)
+
+ %{
+ :db_name => db_name,
+ :db => db,
+ :ddoc => ddoc
+ }
+ end
+
+ # test "group=true count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true
+ # # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 1]},
+ # {:row, [key: [1, 1, 5], value: 1]},
+ # {:row, [key: [1, 2, 6], value: 1]},
+ # {:row, [key: [2, 1], value: 1]},
+ # {:row, [key: [2, 3, 6], value: 1]},
+ # {:row, [key: [3, 1], value: 1]},
+ # {:row, [key: [3, 1, 5], value: 1]},
+ # {:row, [key: [3, 4, 5], value: 1]}
+ # ]
+ # end
+
+ # test "group=1 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 1
+ # # :limit => 6
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1], value: 2]},
+ # {:row, [key: [2], value: 2]},
+ # {:row, [key: [3], value: 2]}
+ # ]
+ # end
+ #
+ # test "group=2 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 2,
+ # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 2]},
+ # {:row, [key: [1, 2], value: 1]},
+ # {:row, [key: [2, 1], value: 1]},
+ # {:row, [key: [2, 3], value: 1]},
+ # {:row, [key: [3, 1], value: 2]},
+ # {:row, [key: [3, 4], value: 1]}
+ # ]
+ # end
+ #
+ # test "group=2 count reduce with limit = 3", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 2,
+ # :limit => 4
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "baz")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: 1, value: 2]},
+ # {:row, [key: 2, value: 2]},
+ # {:row, [key: 3, value: 2]},
+ # {:row, [key: [1, 1], value: 1]}
+ # ]
+ # end
+ #
+ # # [
+ # # row: [key: [2019, 1, 2], value: 1],
+ # # row: [key: [2019, 1, 4], value: 1],
+ # # row: [key: [2019, 2, 1], value: 1],
+ # # row: [key: [2019, 2, 3], value: 1]
+ # # ]
+ #
+ # test "group=2 count reduce with startkey", context do
+ # args = %{
+ # # :reduce => true,
+ # # :group_level => 2,
+ # :start_key => [2019, 1, 4]
+ # # :limit => 4
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "boom")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2019, 1], value: 1]},
+ # {:row, [key: [2019, 2], value: 2]}
+ # ]
+ # end
+
+ test "group_level=0 _max reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 0
+ # :limit => 9
+ }
+
+ {:ok, res} = run_query(context, args, "max")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: :null, value: 3]}
+ ]
+ end
+
+ defp run_query(context, args, view) do
+ db = context[:db]
+ ddoc = context[:ddoc]
+
+ :couch_views.query(db, ddoc, view, &__MODULE__.default_cb/2, [], args)
+ end
+
+ def default_cb(:complete, acc) do
+ {:ok, Enum.reverse(acc)}
+ end
+
+ def default_cb({:final, info}, []) do
+ {:ok, [info]}
+ end
+
+ def default_cb({:final, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb({:meta, _}, acc) do
+ {:ok, acc}
+ end
+
+ def default_cb(:ok, :ddoc_updated) do
+ {:ok, :ddoc_updated}
+ end
+
+ def default_cb(row, acc) do
+ {:ok, [row | acc]}
+ end
+
+ defp create_docs() do
+ for i <- 1..1 do
+ group =
+ if rem(i, 3) == 0 do
+ "first"
+ else
+ "second"
+ end
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group}
+ ]}
+ )
+ end
+ end
+
+ defp create_ddoc() do
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "_design/bar"},
+ {"views",
+ {[
+# {"baz",
+# {[
+# {"map",
+# """
+# function(doc) {
+# emit(doc.value, doc.value);
+# emit(doc.value, doc.value);
+# emit([doc.value, 1], doc.value);
+# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+# if (doc.value === 3) {
+# emit([1, 1, 5], 1);
+# emit([doc.value, 1, 5], 1);
+# }
+# }
+# """},
+# {"reduce", "_count"}
+# ]}},
+# {"boom",
+# {[
+# {"map",
+# """
+# function(doc) {
+# var month = 1;
+# if (doc.value % 2) {
+# month = 2;
+# }
+# emit([2019, month, doc.value], doc.value);
+# }
+# """},
+# {"reduce", "_count"}
+# ]}},
+ {"max",
+ {[
+ {"map",
+ """
+ function(doc) {
+ //emit(doc.value, doc.value);
+ //emit([doc.value, 1], doc.value);
+ //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ emit(1, 1);
+ emit(2, 2);
+ emit(3, 3);
+ emit(4, 4);
+
+ emit([2019, 2, 2], 1);
+ emit([2019, 3, 3], 2);
+ emit([2019, 3, 3], 3);
+ emit([2019, 4, 3], 4);
+ emit([2019, 5, 3], 6);
+ if (doc.value === 3) {
+ //emit([doc.value, 1, 5], 1);
+ }
+ }
+ """},
+ {"reduce", "_stats"}
+ ]}}
+ ]}}
+ ]}
+ )
+ end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()
[couchdb] 06/08: startkey/endkey with level0
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 457cb7e47bf14839e0d06731d54bee5055db7764
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Tue Dec 3 14:48:14 2019 +0200
startkey/endkey with level0
---
src/couch_views/src/couch_views.erl | 18 ++--
src/couch_views/src/couch_views_reduce.erl | 45 +++++----
src/couch_views/src/couch_views_reduce_fdb.erl | 51 ++--------
.../test/exunit/couch_views_reduce_test.exs | 112 +++++++++++++++++----
4 files changed, 131 insertions(+), 95 deletions(-)
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 5b2c76f..5e40670 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -44,14 +44,7 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
ok = maybe_update_view(TxDb, Mrst, Args3),
- case is_reduce_view(Args3) of
- true ->
- couch_views_reader:read_reduce(Db, Mrst, ViewName,
- Callback, Acc0, Args3);
- false ->
- couch_views_reader:read(Db, Mrst, ViewName,
- Callback, Acc0, Args3)
- end
+ read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
end)
catch throw:{build_view, WaitSeq} ->
couch_views_jobs:build_view(Db, Mrst, WaitSeq),
@@ -62,7 +55,14 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
try
- couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
+ case is_reduce_view(Args) of
+ true ->
+ couch_views_reader:read_reduce(Db, Mrst, ViewName,
+ Callback, Acc0, Args);
+ false ->
+ couch_views_reader:read(Db, Mrst, ViewName,
+ Callback, Acc0, Args)
+ end
after
UpdateAfter = Args#mrargs.update == lazy,
if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 04c5cb8..a2e3a93 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -66,7 +66,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
- Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
@@ -79,30 +79,35 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
#mrargs{
-%% limit = Limit,
-%% start_key = StartKey,
-%% end_key = EndKey,
- group = Group,
- group_level = GroupLevel
+ start_key = StartKey,
+ end_key = EndKey
} = Args,
- {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
- ReduceIdxPrefix),
-
- StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
-
-%% StartKey1 = case StartKey of
-%% undefined -> erlfdb_key:first_greater_than(StartKey0);
-%% StartKey -> create_key(StartKey, 0, Red)
-%% end,
-
- StartKey1 = erlfdb_key:first_greater_than(StartKey0),
+ StartKey1 = case StartKey of
+ undefined ->
+ StartKey0 = encode_key(0, 0, ReduceIdxPrefix),
+ erlfdb_key:first_greater_than(StartKey0);
+ StartKey ->
+ StartKey0 = encode_key(StartKey, 0, ReduceIdxPrefix),
+ erlfdb_key:first_greater_or_equal(StartKey0)
+ end,
- [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+ EndKey1 = case EndKey of
+ undefined ->
+ {_, EndKey0} = erlfdb_tuple:range({0},
+ ReduceIdxPrefix),
+ EndKey0;
+ EndKey ->
+ io:format("ENDKEY ~n"),
+ EndKey0 = encode_key(EndKey, 0, ReduceIdxPrefix),
+ erlfdb_key:first_greater_than(EndKey0)
+ end,
+ [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
-encode_key(Key, Level) ->
- {Level, couch_views_encoding:encode(Key, key)}.
+encode_key(Key, Level, ReduceIdxPrefix) ->
+ EK = {Level, couch_views_encoding:encode(Key, key)},
+ erlfdb_tuple:pack(EK, ReduceIdxPrefix).
handle_row(Key, Value, Acc) ->
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 7a7e120..77514ed 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -69,38 +69,6 @@ sum_rows(Rows) ->
end, 0, Rows).
-%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
-%% #{
-%% db_prefix := DbPrefix
-%% } = Db,
-%%
-%%%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-%% #mrargs{
-%% limit = Limit
-%% } = Args,
-%%
-%% fabric2_fdb:transactional(Db, fun(TxDb) ->
-%%
-%% Acc0 = #{
-%% sig => Sig,
-%% view_id => ViewId,
-%% user_acc => UserAcc0,
-%% args => Args,
-%% callback => UserCallback,
-%% reduce_idx_prefix => ReduceIdxPrefix,
-%% limit => Limit,
-%% row_count => 0
-%%
-%% },
-%%
-%% Acc1 = read_level0_only(TxDb, Acc0, Callback),
-%% #{
-%% user_acc := UserAcc1
-%% } = Acc1,
-%% {ok, UserAcc1}
-%% end).
-
fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
#{
db_prefix := DbPrefix
@@ -158,17 +126,20 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
LastKey0
end,
+ GroupLevelKey = group_level_key(Key, GroupLevel),
+ GroupKV = [{GroupLevelKey, Val}],
+
case group_level_equal(Key, LastKey, GroupLevel) of
true ->
Acc#{
- rows := Rows ++ [{Key, Val}]
+ rows := Rows ++ GroupKV
};
false ->
UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
Callback, UserAcc),
Acc#{
user_acc := UserAcc1,
- rows := [{Key, Val}]
+ rows := GroupKV
}
end.
@@ -352,6 +323,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
hash_key(Key) ->
+ % TODO: look at alternatives like murmur3 here
erlang:phash2(Key).
@@ -360,7 +332,6 @@ should_add_key_to_level(0, _KeyHash) ->
should_add_key_to_level(Level, KeyHash) ->
(KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
create_key(ReduceIdxPrefix, SkipLevel, Key) ->
@@ -392,13 +363,3 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
EV = create_value(Key, Val),
ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
-%% case PrevVal >= Val of
-%% true -> {PrevKey, PrevVal};
-%% false -> {PrevKey, Val}
-%% end.
-
-
-
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 0812f1f..d6dcc60 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -2,9 +2,7 @@ defmodule CouchViewsReduceTest do
use Couch.Test.ExUnit.Case
alias Couch.Test.Utils
-
alias Couch.Test.Setup
-
alias Couch.Test.Setup.Step
setup_all do
@@ -57,26 +55,26 @@ defmodule CouchViewsReduceTest do
# ]
# end
-# test "group_level=1 count reduce", context do
-# args = %{
-# :reduce => true,
-# :group_level => 1
-# }
-#
-# {:ok, res} = run_query(context, args, "dates_count")
-# IO.inspect(res, label: "OUT")
-#
-# assert res == [
-# {:row, [key: [2017], value: 4]},
-# {:row, [key: [2018], value: 3]},
-# {:row, [key: [2019], value: 2]}
-# ]
-# end
-
- test "group_level=1 reduce reduce", context do
+ # test "group_level=1 count reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 1
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "dates_count")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2017], value: 4]},
+ # {:row, [key: [2018], value: 3]},
+ # {:row, [key: [2019], value: 2]}
+ # ]
+ # end
+
+ test "group_level=1 reduce", context do
args = %{
- :reduce => true,
- :group_level => 1
+ reduce: true,
+ group_level: 1
}
{:ok, res} = run_query(context, args, "dates_sum")
@@ -89,6 +87,78 @@ defmodule CouchViewsReduceTest do
]
end
+ test "group_level=1 reduce with startkey/endkey", context do
+ args = %{
+ reduce: true,
+ group_level: 1,
+ start_key: [2017, 4, 1],
+ end_key: [2018, 3, 1],
+
+ }
+
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: [2017], value: 22]},
+ {:row, [key: [2018], value: 6]}
+ ]
+ end
+
+ test "group_level=1 reduce with startkey/endkey take 2", context do
+ args = %{
+ reduce: true,
+ group_level: 1,
+ start_key: [2017, 4, 1],
+ end_key: [2019, 3, 2],
+ }
+
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: [2017], value: 22]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 4]}
+ ]
+ end
+
+ test "group_level=1 reduce with startkey/endkey take 3", context do
+ args = %{
+ reduce: true,
+ group_level: 1,
+ start_key: [2017, 4, 1],
+ end_key: [2019, 05, 1],
+ }
+
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: [2017], value: 22]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 17]}
+ ]
+ end
+
+ test "group=true reduce with startkey/endkey", context do
+ args = %{
+ reduce: true,
+ group: true,
+ start_key: [2018, 5, 1],
+ end_key: [2019, 04, 1],
+ }
+
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: [2018, 5, 1], value: 7]},
+ {:row, [key: [2019, 3, 1], value: 4]},
+ {:row, [key: [2019, 4, 1], value: 6]}
+ ]
+ end
+
# test "group=1 count reduce", context do
# args = %{
# :reduce => true,
[couchdb] 02/08: printing
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 88ed4eb5f70c70cfa68270f832589a67ea3fe4f8
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Oct 28 14:34:00 2019 +0200
printing
---
src/couch_views/src/couch_views_fdb.erl | 92 +---------
src/couch_views/src/couch_views_indexer.erl | 4 +-
src/couch_views/src/couch_views_reduce.erl | 84 +++------
.../test/exunit/couch_views_reduce_test.exs | 202 ++++++++++-----------
4 files changed, 133 insertions(+), 249 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 07241dd..479a707 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,7 +20,6 @@
get_kv_size/3,
fold_map_idx/6,
- fold_reduce_idx/6,
write_doc/4
]).
@@ -183,8 +182,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
- ExistingKeys, ReduceResult),
- update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+ ExistingKeys, ReduceResult)
end, lists:zip3(ViewIds, Results, ReduceResults)).
@@ -357,53 +355,6 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
end, KVsToAdd).
-update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = TxDb,
-
-%% Unique = lists:usort([K || {K, _V} <- NewRows]),
-
-%% KeysToRem = ExistingKeys -- Unique,
-%% lists:foreach(fun(RemKey) ->
-%% {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
-%% ok = erlfdb:clear_range(Tx, Start, End)
-%% end, KeysToRem),
-%%
- {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
- ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
- add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
- add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
-
-
-add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
- lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
- KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
- ReduceType, ?VIEW_ROW_KEY),
- VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
- ReduceType, ?VIEW_ROW_VALUE),
- ok = erlfdb:set(Tx, KK, Key2),
- ok = erlfdb:add(Tx, VK, Val)
- end, KVsToAdd).
-
-
-reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
- Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
- erlfdb_tuple:pack(Key, DbPrefix).
-
-
-reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
- Key = {ReduceKey, GroupLevel, ReduceType, RowType},
- erlfdb_tuple:pack(Key, ReduceIdxPrefix).
-
-
-%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
-%% Encoded = couch_views_encoding:encode(MapKey, key),
-%% Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
-%% erlfdb_tuple:range(Key, DbPrefix).
-
-
get_view_keys(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -497,47 +448,6 @@ process_rows(Rows) ->
end, [], Grouped).
-process_reduce_rows(Rows) ->
- ReduceExact = encode_reduce_rows(Rows),
- ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
- Out = create_grouping(Key, Val, [], Groupings),
- Out
- end, #{}, Rows),
- ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
- {ReduceExact, ReduceGroups1}.
-
-
-encode_reduce_rows(Rows) ->
- lists:map(fun({K, V}) ->
- EK1 = couch_views_encoding:encode(K, key),
- EK2 = couch_views_encoding:encode(K, value),
- {EK1, EK2, V, group_level(K)}
- end, Rows).
-
-
-group_level(Key) when is_list(Key) ->
- length(Key);
-
-group_level(_Key) ->
- 1.
-
-
-create_grouping([], _Val, _, Groupings) ->
- Groupings;
-
-create_grouping([Head | Rest], Val, Key, Groupings) ->
- Key1 = Key ++ [Head],
- Groupings1 = maps:update_with(Key1, fun(OldVal) ->
- OldVal + Val
- end, Val, Groupings),
- create_grouping(Rest, Val, Key1, Groupings1);
-
-create_grouping(Key, Val, _, Groupings) ->
- maps:update_with(Key, fun(OldVal) ->
- OldVal + Val
- end, Val, Groupings).
-
-
calculate_row_size(Rows) ->
lists:foldl(fun({K, V}, Acc) ->
Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index d440839..4c430c1 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -178,8 +178,8 @@ update(#{} = Db, Mrst0, State0) ->
DocAcc1 = fetch_docs(TxDb, DocAcc),
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
- Results = run_reduce(Mrst1, MappedDocs),
- write_docs(TxDb, Mrst1, MappedDocs, State2),
+ MappedReducedDocs = run_reduce(Mrst1, MappedDocs),
+ write_docs(TxDb, Mrst1, MappedReducedDocs, State2),
case Count < Limit of
true ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 1502f38..4cb7416 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -26,7 +26,7 @@
-include_lib("fabric/include/fabric2.hrl").
--define(LEVEL_FAN_POW, 4).
+-define(LEVEL_FAN_POW, 1).
-define(MAX_SKIP_LIST_LEVELS, 6).
@@ -84,7 +84,6 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
args => Args,
callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
- next => key,
rows => []
},
@@ -111,31 +110,19 @@ args_to_fdb_opts(#mrargs{} = Args) ->
ok.
-fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
- #{
- reduce_idx_prefix := ReduceIdxPrefix
- } = Acc,
-
- {Level, EK, ?VIEW_ROW_KEY}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-
-%% Key = couch_views_encoding:decode(EV),
- Val = couch_views_encoding:decode(EV),
- Acc#{next := value, key := Val};
-
-fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
#{
reduce_idx_prefix := ReduceIdxPrefix,
- rows := Rows,
- key := Key
+ rows := Rows
} = Acc,
- {Level, EK, ?VIEW_ROW_VALUE}
+ {_Level, _EK}
= erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ {EK, EV1} = erlfdb_tuple:unpack(EV),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
-%% Key = couch_views_encoding:decode(EV),
- Val = couch_views_encoding:decode(EV),
- Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+ Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -219,8 +206,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
- add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val])
+%% add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
end, ReduceResult).
@@ -241,21 +228,9 @@ create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
end, Levels)
end).
-%% This sucks but its simple for now
-should_add_key_to_level(0, _, _) ->
- true;
-
-should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
- false;
-
-should_add_key_to_level(_, _, false) ->
- false;
-
-should_add_key_to_level(_, _Key, _Prev) ->
- crypto:rand_uniform(0, 2) == 0.
-%%should_add_key_to_level(Level, Key) ->
-%% erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+should_add_key_to_level(Level, Key) ->
+ (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -270,20 +245,18 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foldl(fun(Level, PrevCoinFlip) ->
+ lists:foldl(fun(Level) ->
io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
- case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+ case should_add_key_to_level(Level, Key) of
true ->
io:format("Adding ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
- true;
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
false ->
{PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
- false
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
end
end, true, Levels)
end).
@@ -297,13 +270,20 @@ rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
- Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
-reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
- Key = {SkipLevel, ReduceKey, RowType},
- erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+ EK = couch_views_encoding:encode(Key, key),
+ LevelKey = {SkipLevel, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+ EK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+ erlfdb_tuple:pack({EK, EV}).
add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
@@ -311,14 +291,10 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
tx := Tx
} = TxDb,
- EK = couch_views_encoding:encode(Key, key),
- EVK = couch_views_encoding:encode(Key),
- EV = couch_views_encoding:encode(Val),
+ LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+ EV = create_value(Key, Val),
- KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
- VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
- ok = erlfdb:set(Tx, KK, EVK),
- ok = erlfdb:set(Tx, VK, EV).
+ ok = erlfdb:set(Tx, LevelKey, EV).
get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 3f7a173..a526658 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -8,7 +8,7 @@ defmodule CouchViewsReduceTest do
alias Couch.Test.Setup.Step
setup_all do
- test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+ test_ctx = :test_util.start_couch([:fabric, :couch_js, :couch_views, :couch_jobs])
on_exit(fn ->
:test_util.stop_couch(test_ctx)
@@ -40,30 +40,30 @@ defmodule CouchViewsReduceTest do
}
end
- # test "group=true count reduce", context do
- # args = %{
- # :reduce => true,
- # :group => true
- # # :limit => 9
- # }
- #
- # {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
- #
- # assert res == [
- # {:row, [key: 1, value: 2]},
- # {:row, [key: 2, value: 2]},
- # {:row, [key: 3, value: 2]},
- # {:row, [key: [1, 1], value: 1]},
- # {:row, [key: [1, 1, 5], value: 1]},
- # {:row, [key: [1, 2, 6], value: 1]},
- # {:row, [key: [2, 1], value: 1]},
- # {:row, [key: [2, 3, 6], value: 1]},
- # {:row, [key: [3, 1], value: 1]},
- # {:row, [key: [3, 1, 5], value: 1]},
- # {:row, [key: [3, 4, 5], value: 1]}
- # ]
- # end
+ test "group=true count reduce", context do
+ args = %{
+ :reduce => true,
+ :group => true
+ # :limit => 9
+ }
+
+ {:ok, res} = run_query(context, args, "baz")
+ IO.inspect(res, label: "OUT")
+
+ assert res == [
+ {:row, [key: 1, value: 2]},
+ {:row, [key: 2, value: 2]},
+ {:row, [key: 3, value: 2]},
+ {:row, [key: [1, 1], value: 1]},
+ {:row, [key: [1, 1, 5], value: 1]},
+ {:row, [key: [1, 2, 6], value: 1]},
+ {:row, [key: [2, 1], value: 1]},
+ {:row, [key: [2, 3, 6], value: 1]},
+ {:row, [key: [3, 1], value: 1]},
+ {:row, [key: [3, 1, 5], value: 1]},
+ {:row, [key: [3, 4, 5], value: 1]}
+ ]
+ end
# test "group=1 count reduce", context do
# args = %{
@@ -150,20 +150,20 @@ defmodule CouchViewsReduceTest do
# ]
# end
- test "group_level=0 _max reduce", context do
- args = %{
- :reduce => true,
- :group_level => 0
- # :limit => 9
- }
-
- {:ok, res} = run_query(context, args, "max")
- IO.inspect(res, label: "OUT")
-
- assert res == [
- {:row, [key: :null, value: 3]}
- ]
- end
+ # test "group_level=0 _sum reduce", context do
+ # args = %{
+ # :reduce => true,
+ # :group_level => 0
+ # # :limit => 9
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "max")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: :null, value: 3]}
+ # ]
+ # end
defp run_query(context, args, view) do
db = context[:db]
@@ -217,70 +217,68 @@ defmodule CouchViewsReduceTest do
end
defp create_ddoc() do
- :couch_doc.from_json_obj(
- {[
- {"_id", "_design/bar"},
- {"views",
- {[
-# {"baz",
-# {[
-# {"map",
-# """
-# function(doc) {
-# emit(doc.value, doc.value);
-# emit(doc.value, doc.value);
-# emit([doc.value, 1], doc.value);
-# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-# if (doc.value === 3) {
-# emit([1, 1, 5], 1);
-# emit([doc.value, 1, 5], 1);
-# }
-# }
-# """},
-# {"reduce", "_count"}
-# ]}},
-# {"boom",
-# {[
-# {"map",
-# """
-# function(doc) {
-# var month = 1;
-# if (doc.value % 2) {
-# month = 2;
-# }
-# emit([2019, month, doc.value], doc.value);
-# }
-# """},
-# {"reduce", "_count"}
-# ]}},
- {"max",
- {[
- {"map",
- """
- function(doc) {
- //emit(doc.value, doc.value);
- //emit([doc.value, 1], doc.value);
- //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
- emit(1, 1);
- emit(2, 2);
- emit(3, 3);
- emit(4, 4);
+ :couch_doc.from_json_obj({[
+ {"_id", "_design/bar"},
+ {"views",
+ {[
+ {"baz",
+ {[
+ {"map",
+ """
+ function(doc) {
+ emit(doc.value, doc.value);
+ emit(doc.value, doc.value);
+ emit([doc.value, 1], doc.value);
+ emit([doc.value, doc.value + 1, doc.group.length], doc.value);
- emit([2019, 2, 2], 1);
- emit([2019, 3, 3], 2);
- emit([2019, 3, 3], 3);
- emit([2019, 4, 3], 4);
- emit([2019, 5, 3], 6);
- if (doc.value === 3) {
- //emit([doc.value, 1, 5], 1);
- }
+ if (doc.value === 3) {
+ emit([1, 1, 5], 1);
+ emit([doc.value, 1, 5], 1);
}
- """},
- {"reduce", "_stats"}
- ]}}
- ]}}
- ]}
- )
+ }
+ """},
+ {"reduce", "_count"}
+ ]}}
+ # {"boom",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # var month = 1;
+ # if (doc.value % 2) {
+ # month = 2;
+ # }
+ # emit([2019, month, doc.value], doc.value);
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}},
+ # {"max",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # //emit(doc.value, doc.value);
+ # //emit([doc.value, 1], doc.value);
+ # //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ # emit(1, 1);
+ # emit(2, 2);
+ # emit(3, 3);
+ # emit(4, 4);
+ #
+ # emit([2019, 2, 2], 1);
+ # emit([2019, 3, 3], 2);
+ # emit([2019, 3, 3], 3);
+ # emit([2019, 4, 3], 4);
+ # emit([2019, 5, 3], 6);
+ # if (doc.value === 3) {
+ # //emit([doc.value, 1, 5], 1);
+ # }
+ # }
+ # """},
+ # {"reduce", "_stats"}
+ # ]}}
+ ]}}
+ ]})
end
end
[couchdb] 04/08: can do group_level query
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 0ddb80078bc5ba527b875f3e14d51295cacc46f6
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 14:09:36 2019 +0200
can do group_level query
---
src/couch_views/src/couch_views_reader.erl | 9 +-
src/couch_views/src/couch_views_reduce.erl | 15 +-
src/couch_views/src/couch_views_reduce_fdb.erl | 175 +++++++++++++++------
.../test/exunit/couch_views_reduce_test.exs | 47 +++---
4 files changed, 168 insertions(+), 78 deletions(-)
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 394b3cf..e750a94 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -32,7 +32,8 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
} = Mrst,
ViewId = get_view_id(Lang, Args, ViewName, Views),
- couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+ Reducer = get_view_reducer(Lang, Args, ViewName, Views),
+ couch_views_reduce:read_reduce(Db, Sig, ViewId, Reducer, UserCallback,
UserAcc0, Args).
%% Fun = fun handle_reduce_row/3,
%%
@@ -243,6 +244,12 @@ get_view_id(Lang, Args, ViewName, Views) ->
{red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
end.
+get_view_reducer(Lang, Args, ViewName, Views) ->
+ case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+ {map, View, _Args} -> throw(no_reduce);
+ View -> couch_mrview_util:extract_view_reduce(View)
+ end.
+
expand_keys_args(#mrargs{keys = undefined} = Args) ->
[Args];
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index ebd2f47..b7eb18e 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,7 +15,7 @@
-export([
run_reduce/2,
- read_reduce/6,
+ read_reduce/7,
setup_reduce_indexes/3
]).
@@ -30,7 +30,7 @@
-define(MAX_SKIP_LIST_LEVELS, 6).
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
#{
db_prefix := DbPrefix
} = Db,
@@ -38,9 +38,16 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
#mrargs{
- limit = Limit
+ limit = Limit,
+ group = Group,
+ group_level = GroupLevel
} = Args,
+ GroupLevel1 = case Group of
+ true -> group_true;
+ _ -> GroupLevel
+ end,
+
Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
try
@@ -59,7 +66,7 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
- Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 9683265..5759c42 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,7 +15,7 @@
-export([
- fold_level0/6,
+ fold_level0/8,
create_skip_list/3,
update_reduce_idx/6
]).
@@ -26,7 +26,7 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric2.hrl").
--define(MAX_SKIP_LIST_LEVELS, 6).
+-define(MAX_SKIP_LIST_LEVELS, 1).
-define(LEVEL_FAN_POW, 1).
log_levels(Db, Sig, ViewId) ->
@@ -34,34 +34,40 @@ log_levels(Db, Sig, ViewId) ->
db_prefix := DbPrefix
} = Db,
- Levels = lists:seq(0, 6),
+ Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Opts = [{streaming_mode, want_all}],
fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- lists:foreach(fun (Level) ->
+ lists:foldl(fun (Level, Level0Total) ->
{StartKey, EndKey} = erlfdb_tuple:range({Level},
ReduceIdxPrefix),
- Acc0 = #{
- sig => Sig,
- view_id => ViewId,
- reduce_idx_prefix => ReduceIdxPrefix,
- user_acc => [],
- callback => fun handle_log_levels/3
- },
-
- Fun = fun fold_fwd_cb/2,
- Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
- #{
- user_acc := Rows
- } = Acc,
- io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
- end, Levels)
+ Future = erlfdb:get_range(Tx, StartKey, EndKey, Opts),
+ Rows = lists:map(fun ({_Key, EV}) ->
+ unpack_key_value(EV)
+ end, erlfdb:wait(Future)),
+
+ io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+ case Level == 0 of
+ true ->
+ sum_rows(Rows);
+ false ->
+ Total = sum_rows(Rows),
+ if Total == Level0Total -> Level0Total; true ->
+ io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
+%% throw(level_total_error)
+ end
+ end
+
+ end, 0, Levels)
end).
-handle_log_levels(Key, Value, Acc) ->
- Acc ++ [{Key, Value}].
+sum_rows(Rows) ->
+ lists:foldl(fun ({_, Val}, Sum) ->
+ Val + Sum
+ end, 0, Rows).
+
%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
%% #{
@@ -95,7 +101,7 @@ handle_log_levels(Key, Value, Acc) ->
%% {ok, UserAcc1}
%% end).
-fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
#{
db_prefix := DbPrefix
} = Db,
@@ -108,7 +114,10 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
user_acc => UserAcc0,
%% args := Args,
callback => UserCallback,
- reduce_idx_prefix => ReduceIdxPrefix
+ reduce_idx_prefix => ReduceIdxPrefix,
+ reducer => Reducer,
+ group_level => GroupLevel,
+ rows => []
},
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -118,36 +127,94 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
} = TxDb,
-%% {StartKey, EndKey} = erlfdb_tuple:range({0},
-%% ReduceIdxPrefix),
{startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
{endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
-%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Fun = fun fold_fwd_cb/2,
Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
#{
- user_acc := UserAcc1
+ user_acc := UserAcc1,
+ rows := Rows
} = Acc1,
- UserAcc1
+
+ rereduce_and_reply(Reducer, Rows, GroupLevel,
+ UserCallback, UserAcc1)
end).
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
#{
- reduce_idx_prefix := ReduceIdxPrefix,
callback := Callback,
- user_acc := UserAcc
+ user_acc := UserAcc,
+ group_level := GroupLevel,
+ rows := Rows,
+ reducer := Reducer
} = Acc,
- {_Level, _EK}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- {EK, EV1} = erlfdb_tuple:unpack(EV),
- Key = couch_views_encoding:decode(EK),
- Val = couch_views_encoding:decode(EV1),
+ {Key, Val} = unpack_key_value(EV),
+
+ LastKey = if Rows == [] -> false; true ->
+ {LastKey0, _} = lists:last(Rows),
+ LastKey0
+ end,
+
+ case group_level_equal(Key, LastKey, GroupLevel) of
+ true ->
+ Acc#{
+ rows := Rows ++ [{Key, Val}]
+ };
+ false ->
+ UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
+ Callback, UserAcc),
+ Acc#{
+ user_acc := UserAcc1,
+ rows := [{Key, Val}]
+ }
+ end.
+
+rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
+ Acc;
+
+rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
+ {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+ Callback(ReducedKey, ReducedVal, Acc).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+ no_kvs;
- UserAcc1 = Callback(Key, Val, UserAcc),
- Acc#{user_acc := UserAcc1}.
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+ {Key, Val} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+ Val = length(Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+ true;
+
+group_level_equal(_One, _Two, group_true) ->
+ false;
+
+group_level_equal(One, Two, GroupLevel) ->
+ GroupOne = group_level_key(One, GroupLevel),
+ GroupTwo = group_level_key(Two, GroupLevel),
+ GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+ null;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+ lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+ Key.
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
@@ -155,6 +222,13 @@ reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
erlfdb_tuple:pack(Key, DbPrefix).
+unpack_key_value(EncodedValue) ->
+ {EK, EV1} = erlfdb_tuple:unpack(EncodedValue),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
+ {Key, Val}.
+
+
%% Inserting
update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
#{
@@ -205,15 +279,15 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
- io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+ io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
- io:format("Adding ~p ~p ~n", [Level, Key]),
+ io:format("Adding at ~p ~p ~n", [Level, Key]),
add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
false ->
- {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
- io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
@@ -229,7 +303,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
EK = couch_views_encoding:encode(Key, key),
StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
- StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+ StartKeySel = erlfdb_key:last_less_than(StartKey),
EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
@@ -241,6 +315,9 @@ hash_key(Key) ->
erlang:phash2(Key).
+should_add_key_to_level(0, _KeyHash) ->
+ true;
+
should_add_key_to_level(Level, KeyHash) ->
(KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -277,11 +354,11 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
ok = erlfdb:set(Tx, LevelKey, EV).
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
- case PrevVal >= Val of
- true -> {PrevKey, PrevVal};
- false -> {PrevKey, Val}
- end.
+%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+%% case PrevVal >= Val of
+%% true -> {PrevKey, PrevVal};
+%% false -> {PrevKey, Val}
+%% end.
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index c1b35e2..488f3ee 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -60,16 +60,16 @@ defmodule CouchViewsReduceTest do
test "group_level=1 count reduce", context do
args = %{
:reduce => true,
- :group => true,
+ :group_level => 1,
}
{:ok, res} = run_query(context, args, "dates")
IO.inspect(res, label: "OUT")
assert res == [
- {:row, [key: [2017], value: 1]},
- {:row, [key: [2018], value: 1]},
- {:row, [key: [2019], value: 1]}
+ {:row, [key: [2017], value: 4]},
+ {:row, [key: [2018], value: 3]},
+ {:row, [key: [2019], value: 2]}
]
end
@@ -221,7 +221,7 @@ defmodule CouchViewsReduceTest do
[2019, 4, 1]
]
- for i <- 1..4 do
+ for i <- 1..10 do
group =
if rem(i, 3) == 0 do
"first"
@@ -235,7 +235,6 @@ defmodule CouchViewsReduceTest do
{"some", "field"},
{"group", group},
{"date", Enum.at(dates, i - 1)}
- # {"timestamp", Enum.at(timestamps, i - 1)}
]})
end
end
@@ -254,25 +253,25 @@ defmodule CouchViewsReduceTest do
}
"""},
{"reduce", "_count"}
- ]}},
- {"baz",
- {[
- {"map",
- """
- function(doc) {
- emit(doc.value, doc.value);
- emit(doc.value, doc.value);
- emit([doc.value, 1], doc.value);
- emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
- if (doc.value === 3) {
- emit([1, 1, 5], 1);
- emit([doc.value, 1, 5], 1);
- }
- }
- """},
- {"reduce", "_count"}
]}}
+# {"baz",
+# {[
+# {"map",
+# """
+# function(doc) {
+# emit(doc.value, doc.value);
+# emit(doc.value, doc.value);
+# emit([doc.value, 1], doc.value);
+# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+# if (doc.value === 3) {
+# emit([1, 1, 5], 1);
+# emit([doc.value, 1, 5], 1);
+# }
+# }
+# """},
+# {"reduce", "_count"}
+# ]}}
# {"boom",
# {[
# {"map",
[couchdb] 03/08: progress with reading level 0
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 0076f15d191bd2fb71a0837e474ce12019be0c8f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Oct 30 11:37:36 2019 +0200
progress with reading level 0
---
src/couch_views/src/couch_views_fdb.erl | 4 +-
src/couch_views/src/couch_views_indexer.erl | 8 +-
src/couch_views/src/couch_views_reader.erl | 30 --
src/couch_views/src/couch_views_reduce.erl | 306 +++++++--------------
src/couch_views/src/couch_views_reduce_fdb.erl | 274 +++++++++++++++++-
.../test/exunit/couch_views_reduce_test.exs | 94 +++++--
6 files changed, 428 insertions(+), 288 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 479a707..8999d76 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -133,8 +133,6 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
Acc1.
-
-
write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
#{
id := DocId
@@ -181,7 +179,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
ExistingKeys, ReduceResult)
end, lists:zip3(ViewIds, Results, ReduceResults)).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4c430c1..cff15b0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -298,11 +298,17 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = Mrst,
#{
- last_seq := LastSeq
+ last_seq := LastSeq,
+ view_seq := ViewSeq
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ %% First build of the view
+ if ViewSeq /= <<>> -> ok; true ->
+ couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+ end,
+
lists:foreach(fun(Doc) ->
couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
end, Docs),
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index d08515c..394b3cf 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -236,36 +236,6 @@ handle_row(DocId, Key, Value, Acc) ->
UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
Acc#{acc := UserAcc1}.
-handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
- Acc#{skip := Skip - 1};
-
-handle_reduce_row(Key, Value, Acc) ->
- io:format("ACC ~p ~n", [Acc]),
- #{
- callback := UserCallback,
- acc := UserAcc0,
- row_count := RowCount,
- limit := Limit
- } = Acc,
-
- Row = [
- {key, Key},
- {value, Value}
- ],
-
- RowCountNext = RowCount + 1,
-
- UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
- Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
-
- case RowCountNext == Limit of
- true ->
- UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
- maybe_stop({stop, UserAcc2});
- false ->
- Acc1
- end.
-
get_view_id(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 4cb7416..ebd2f47 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,8 @@
-export([
run_reduce/2,
- update_reduce_idx/6,
- read_reduce/6
+ read_reduce/6,
+ setup_reduce_indexes/3
]).
@@ -30,99 +30,120 @@
-define(MAX_SKIP_LIST_LEVELS, 6).
-log_levels(Db, Sig, ViewId) ->
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
#{
db_prefix := DbPrefix
} = Db,
- Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
- Opts = [{streaming_mode, want_all}],
+ #mrargs{
+ limit = Limit
+ } = Args,
- fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- lists:foreach(fun (Level) ->
- {StartKey, EndKey} = erlfdb_tuple:range({Level},
- ReduceIdxPrefix),
+ Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ %% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
Acc0 = #{
sig => Sig,
view_id => ViewId,
+ user_acc => UserAcc0,
+ args => Args,
+ callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
- next => key,
- key => undefined,
- rows => []
+ limit => Limit,
+ row_count => 0
},
- Fun = fun fold_fwd_cb/2,
- Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ Fun = fun handle_row/3,
+ Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
#{
- rows := Rows
- } = Acc,
- io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
- {ok, Rows}
- end, Levels),
- {ok, []}
- end).
-
+ user_acc := UserAcc1
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc1))}
+ end)
+ catch throw:{done, Out} ->
+ {ok, Out}
+ end.
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
- #{
- db_prefix := DbPrefix
- } = Db,
- Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
- fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
- log_levels(TxDb, Sig, ViewId),
-%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-
-
- Acc0 = #{
- sig => Sig,
- view_id => ViewId,
- user_acc => UserAcc0,
- args => Args,
- callback => UserCallback,
- reduce_idx_prefix => ReduceIdxPrefix,
- rows => []
- },
-
-
-%% Opts = [{limit, 2}, {streaming_mode, want_all}],
-%% EK = couch_views_encoding:encode(0, key),
-%% {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
-%% ReduceIdxPrefix),
-%%
-%% Fun = fun fold_fwd_cb/2,
-%% Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
- #{
- rows := Rows
- } = Acc0,
- {ok, Rows}
- end).
-
-args_to_fdb_opts(#mrargs{} = Args) ->
+args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
#mrargs{
- limit = Limit,
- start_key = StartKey,
- end_key = EndKey
+%% limit = Limit,
+%% start_key = StartKey,
+%% end_key = EndKey,
+ group = Group,
+ group_level = GroupLevel
} = Args,
- ok.
+ {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
+ ReduceIdxPrefix),
+
+ StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
+
+%% StartKey1 = case StartKey of
+%% undefined -> erlfdb_key:first_greater_than(StartKey0);
+%% StartKey -> create_key(StartKey, 0, Red)
+%% end,
+
+ StartKey1 = erlfdb_key:first_greater_than(StartKey0),
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+ [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+
+
+encode_key(Key, Level) ->
+ {Level, couch_views_encoding:encode(Key, key)}.
+
+
+handle_row(Key, Value, Acc) ->
#{
- reduce_idx_prefix := ReduceIdxPrefix,
- rows := Rows
+ callback := UserCallback,
+ user_acc := UserAcc0,
+ row_count := RowCount,
+ limit := Limit
} = Acc,
- {_Level, _EK}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- {EK, EV1} = erlfdb_tuple:unpack(EV),
- Key = couch_views_encoding:decode(EK),
- Val = couch_views_encoding:decode(EV1),
+ Row = [
+ {key, Key},
+ {value, Value}
+ ],
- Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
+ RowCountNext = RowCount + 1,
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc1 = Acc#{user_acc := UserAcc1, row_count := RowCountNext},
+
+ case RowCountNext == Limit of
+ true ->
+ UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+ maybe_stop({stop, UserAcc2});
+ false ->
+ Acc1
+ end.
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+setup_reduce_indexes(Db, Sig, ViewIds) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun (ViewId) ->
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ couch_views_reduce_fdb:create_skip_list(TxDb,
+ ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+ end, ViewIds)
+ end).
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -192,149 +213,6 @@ is_builtin(<<"_", _/binary>>) ->
is_builtin(_) ->
false.
-
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
- #{
- db_prefix := DbPrefix
- } = TxDb,
-
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId
- },
- create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
-
- lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val])
-%% add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
- end, ReduceResult).
-
-
-create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
- #{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId
- } = ViewOpts,
-
- Levels = lists:seq(0, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
-
- lists:foreach(fun(Level) ->
- add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
- end, Levels)
- end).
-
-
-should_add_key_to_level(Level, Key) ->
- (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
-
-
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
- #{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId
- } = ViewOpts,
-
- Levels = lists:seq(0, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foldl(fun(Level) ->
- io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
- {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
- io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
- case should_add_key_to_level(Level, Key) of
- true ->
- io:format("Adding ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
- false ->
- {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
- io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
- end
- end, true, Levels)
- end).
-
-
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
- case PrevVal >= Val of
- true -> {PrevKey, PrevVal};
- false -> {PrevKey, Val}
- end.
-
-
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
-
-
-create_key(ReduceIdxPrefix, SkipLevel, Key) ->
- EK = couch_views_encoding:encode(Key, key),
- LevelKey = {SkipLevel, EK},
- erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
-
-
-create_value(Key, Val) ->
- EK = couch_views_encoding:encode(Key),
- EV = couch_views_encoding:encode(Val),
- erlfdb_tuple:pack({EK, EV}).
-
-
-add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
- #{
- tx := Tx
- } = TxDb,
-
- LevelKey = create_key(ReduceIdxPrefix, Level, Key),
- EV = create_value(Key, Val),
-
- ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
- #{
- tx := Tx
- } = TxDb,
-
- % TODO: see if we need to add in conflict ranges for this for level=0
- Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
-%% LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
-
- EK = couch_views_encoding:encode(Key, key),
- EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-
- {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
-%% EndKey1 = erlfdb_key:first_greater_than(EndKey0),
-
- Callback = fun row_cb/2,
- Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
- io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
- Out.
-
-
-row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
- io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
- {_Level, EK, _VIEW_ROW_VALUE}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- Val = couch_views_encoding:decode(EV),
-%% io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
-
- {key, {EK, ReduceIdxPrefix, Val}};
-
-row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
- io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
- {_Level, EK, ?VIEW_ROW_KEY}
- = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
- Key = couch_views_encoding:decode(EVK),
-
- {Key, Val}.
-
-
-
-
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index bcaaa30..9683265 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,19 +15,273 @@
-export([
-%% write_doc/4
+ fold_level0/6,
+ create_skip_list/3,
+ update_reduce_idx/6
]).
-% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
-%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
+-define(LEVEL_FAN_POW, 1).
+
+log_levels(Db, Sig, ViewId) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Levels = lists:seq(0, 6),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Opts = [{streaming_mode, want_all}],
+
+ fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+ lists:foreach(fun (Level) ->
+ {StartKey, EndKey} = erlfdb_tuple:range({Level},
+ ReduceIdxPrefix),
+
+ Acc0 = #{
+ sig => Sig,
+ view_id => ViewId,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ user_acc => [],
+ callback => fun handle_log_levels/3
+ },
+
+ Fun = fun fold_fwd_cb/2,
+ Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+ #{
+ user_acc := Rows
+ } = Acc,
+ io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
+ end, Levels)
+ end).
+
+handle_log_levels(Key, Value, Acc) ->
+ Acc ++ [{Key, Value}].
+
+%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
%% #{
-%% id := DocId,
-%% reduce_results := ReduceResults
-%% } = Doc,
-%% lists:foreach(fun({ViewId, NewRows}) ->
-%% % update reduce index
-%% ok
-%% end, lists:zip(ViewIds, ReduceResults)).
+%% db_prefix := DbPrefix
+%% } = Db,
+%%
+%%%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+%% #mrargs{
+%% limit = Limit
+%% } = Args,
+%%
+%% fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%
+%% Acc0 = #{
+%% sig => Sig,
+%% view_id => ViewId,
+%% user_acc => UserAcc0,
+%% args => Args,
+%% callback => UserCallback,
+%% reduce_idx_prefix => ReduceIdxPrefix,
+%% limit => Limit,
+%% row_count => 0
+%%
+%% },
+%%
+%% Acc1 = read_level0_only(TxDb, Acc0, Callback),
+%% #{
+%% user_acc := UserAcc1
+%% } = Acc1,
+%% {ok, UserAcc1}
+%% end).
+
+fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ Level = 0,
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Acc = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ %% args := Args,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix
+ },
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+ #{
+ tx := Tx
+ } = TxDb,
+
+
+%% {StartKey, EndKey} = erlfdb_tuple:range({0},
+%% ReduceIdxPrefix),
+ {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+ {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
+%% ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Fun = fun fold_fwd_cb/2,
+ Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
+ #{
+ user_acc := UserAcc1
+ } = Acc1,
+ UserAcc1
+ end).
+
+
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+ #{
+ reduce_idx_prefix := ReduceIdxPrefix,
+ callback := Callback,
+ user_acc := UserAcc
+ } = Acc,
+
+ {_Level, _EK}
+ = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+ {EK, EV1} = erlfdb_tuple:unpack(EV),
+ Key = couch_views_encoding:decode(EK),
+ Val = couch_views_encoding:decode(EV1),
+
+ UserAcc1 = Callback(Key, Val, UserAcc),
+ Acc#{user_acc := UserAcc1}.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+ Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+ lists:foreach(fun(Level) ->
+ add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+ end, Levels)
+ end).
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+ #{
+ db_prefix := DbPrefix,
+ sig := Sig,
+ view_id := ViewId
+ } = ViewOpts,
+
+ Levels = lists:seq(0, MaxLevel),
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ KeyHash = hash_key(Key),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun(Level) ->
+ {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+ io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+ case should_add_key_to_level(Level, KeyHash) of
+ true ->
+ io:format("Adding ~p ~p ~n", [Level, Key]),
+ add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ false ->
+ {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+ end
+ end, Levels)
+ end).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ % TODO: see if we need to add in conflict ranges for this for level=0
+ Opts = [{limit, 1}, {streaming_mode, want_all}],
+
+ EK = couch_views_encoding:encode(Key, key),
+ StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+ StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+ EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+
+ Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
+ [{_FullEncodedKey, PackedValue}] = erlfdb:wait(Future),
+ get_key_value(PackedValue).
+
+
+hash_key(Key) ->
+ erlang:phash2(Key).
+
+
+should_add_key_to_level(Level, KeyHash) ->
+ (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
+%% keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+ EK = couch_views_encoding:encode(Key, key),
+ LevelKey = {SkipLevel, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+ EK = couch_views_encoding:encode(Key),
+ EV = couch_views_encoding:encode(Val),
+ erlfdb_tuple:pack({EK, EV}).
+
+
+get_key_value(PackedValue) ->
+ {EncodedKey, EncodedValue}
+ = erlfdb_tuple:unpack(PackedValue),
+ Key = couch_views_encoding:decode(EncodedKey),
+ Value = couch_views_encoding:decode(EncodedValue),
+ {Key, Value}.
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+ EV = create_value(Key, Val),
+
+ ok = erlfdb:set(Tx, LevelKey, EV).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+ case PrevVal >= Val of
+ true -> {PrevKey, PrevVal};
+ false -> {PrevKey, Val}
+ end.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index a526658..c1b35e2 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,29 +40,37 @@ defmodule CouchViewsReduceTest do
}
end
- test "group=true count reduce", context do
- args = %{
- :reduce => true,
- :group => true
- # :limit => 9
- }
+# test "group=true count reduce with limit", context do
+# args = %{
+# :reduce => true,
+# :group => true,
+# :limit => 3
+# }
+#
+# {:ok, res} = run_query(context, args, "dates")
+# IO.inspect(res, label: "OUT")
+#
+# assert res == [
+# {:row, [key: [2017, 3, 1], value: 1]},
+# {:row, [key: [2017, 4, 1], value: 1]},
+# {:row, [key: [2017, 4, 15], value: 1]}
+# ]
+# end
+
+ test "group_level=1 count reduce", context do
+ args = %{
+ :reduce => true,
+ :group => true,
+ }
- {:ok, res} = run_query(context, args, "baz")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: 1, value: 2]},
- {:row, [key: 2, value: 2]},
- {:row, [key: 3, value: 2]},
- {:row, [key: [1, 1], value: 1]},
- {:row, [key: [1, 1, 5], value: 1]},
- {:row, [key: [1, 2, 6], value: 1]},
- {:row, [key: [2, 1], value: 1]},
- {:row, [key: [2, 3, 6], value: 1]},
- {:row, [key: [3, 1], value: 1]},
- {:row, [key: [3, 1, 5], value: 1]},
- {:row, [key: [3, 4, 5], value: 1]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 1]},
+ {:row, [key: [2018], value: 1]},
+ {:row, [key: [2019], value: 1]}
+ ]
end
# test "group=1 count reduce", context do
@@ -173,6 +181,7 @@ defmodule CouchViewsReduceTest do
end
def default_cb(:complete, acc) do
+ IO.inspect(acc, label: "complete")
{:ok, Enum.reverse(acc)}
end
@@ -197,7 +206,22 @@ defmodule CouchViewsReduceTest do
end
defp create_docs() do
- for i <- 1..1 do
+ dates = [
+ [2017, 3, 1],
+ [2017, 4, 1],
+ # out of order check
+ [2019, 3, 1],
+ [2017, 4, 15],
+ [2018, 4, 1],
+ [2017, 5, 1],
+ [2018, 3, 1],
+ # duplicate check
+ [2018, 4, 1],
+ [2018, 5, 1],
+ [2019, 4, 1]
+ ]
+
+ for i <- 1..4 do
group =
if rem(i, 3) == 0 do
"first"
@@ -205,14 +229,14 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj(
- {[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group}
- ]}
- )
+ :couch_doc.from_json_obj({[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", Enum.at(dates, i - 1)}
+ # {"timestamp", Enum.at(timestamps, i - 1)}
+ ]})
end
end
@@ -221,6 +245,16 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
+ {"dates",
+ {[
+ {"map",
+ """
+ function(doc) {
+ emit(doc.date, doc.value);
+ }
+ """},
+ {"reduce", "_count"}
+ ]}},
{"baz",
{[
{"map",
[couchdb] 05/08: level 0 _sum working
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6f6bedc710df762bf3cc490ea0b8edbc38b4122f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200
level 0 _sum working
---
src/couch_views/src/couch_views.erl.orig | 159 ---------------------
src/couch_views/src/couch_views_fdb.erl | 8 +-
src/couch_views/src/couch_views_indexer.erl | 7 +-
src/couch_views/src/couch_views_reduce.erl | 12 ++
src/couch_views/src/couch_views_reduce_fdb.erl | 54 ++++++-
.../test/exunit/couch_views_reduce_test.exs | 150 +++++++++++--------
6 files changed, 160 insertions(+), 230 deletions(-)
diff --git a/src/couch_views/src/couch_views.erl.orig b/src/couch_views/src/couch_views.erl.orig
deleted file mode 100644
index 1830076..0000000
--- a/src/couch_views/src/couch_views.erl.orig
+++ /dev/null
@@ -1,159 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_views).
-
--export([
- query/6
-]).
-
-
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-
-query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
- case fabric2_db:is_users_db(Db) of
- true ->
- fabric2_users_db:after_doc_read(DDoc, Db);
- false ->
- ok
- end,
-
- DbName = fabric2_db:name(Db),
- {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
-
- #mrst{
- views = Views
- } = Mrst,
-
- Args1 = to_mrargs(Args0),
- Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
- Args3 = couch_mrview_util:validate_args(Args2),
-
- ok = check_range(Args3),
-
- try
-<<<<<<< HEAD
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- ok = maybe_update_view(TxDb, Mrst, Args3),
- read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
- end)
- catch throw:{build_view, WaitSeq} ->
- couch_views_jobs:build_view(Db, Mrst, WaitSeq),
- read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
-=======
- case is_reduce_view(Args3) of
- true ->
- couch_views_reader:read_reduce(Db, Mrst, ViewName,
- Callback, Acc0, Args3);
- false ->
- couch_views_reader:read(Db, Mrst, ViewName,
- Callback, Acc0, Args3)
- end
- after
- UpdateAfter = Args3#mrargs.update == lazy,
- if UpdateAfter == false -> ok; true ->
- couch_views_jobs:build_view_async(Db, Mrst)
- end
->>>>>>> Initial work
- end.
-
-
-read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- try
- couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
- after
- UpdateAfter = Args#mrargs.update == lazy,
- if UpdateAfter == false -> ok; true ->
- couch_views_jobs:build_view_async(TxDb, Mrst)
- end
- end
- end).
-
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
- ok;
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = lazy}) ->
- ok;
-
-maybe_update_view(TxDb, Mrst, _Args) ->
- DbSeq = fabric2_db:get_update_seq(TxDb),
- ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
- case DbSeq == ViewSeq of
- true -> ok;
- false -> throw({build_view, DbSeq})
- end.
-
-
-is_reduce_view(#mrargs{view_type = ViewType}) ->
- ViewType =:= red;
-is_reduce_view({Reduce, _, _}) ->
- Reduce =:= red.
-
-
-to_mrargs(#mrargs{} = Args) ->
- Args;
-
-to_mrargs(#{} = Args) ->
- Fields = record_info(fields, mrargs),
- Indexes = lists:seq(2, record_info(size, mrargs)),
- LU = lists:zip(Fields, Indexes),
-
- maps:fold(fun(Key, Value, Acc) ->
- Index = fabric2_util:get_value(couch_util:to_existing_atom(Key), LU),
- setelement(Index, Acc, Value)
- end, #mrargs{}, Args).
-
-
-check_range(#mrargs{start_key = undefined}) ->
- ok;
-
-check_range(#mrargs{end_key = undefined}) ->
- ok;
-
-check_range(#mrargs{start_key = K, end_key = K}) ->
- ok;
-
-check_range(Args) ->
- #mrargs{
- direction = Dir,
- start_key = SK,
- start_key_docid = SKD,
- end_key = EK,
- end_key_docid = EKD
- } = Args,
-
- case {Dir, view_cmp(SK, SKD, EK, EKD)} of
- {fwd, false} ->
- throw(check_range_error(<<"true">>));
- {rev, true} ->
- throw(check_range_error(<<"false">>));
- _ ->
- ok
- end.
-
-
-check_range_error(Descending) ->
- {query_parse_error,
- <<"No rows can match your key range, reverse your ",
- "start_key and end_key or set descending=",
- Descending/binary>>}.
-
-
-view_cmp(SK, SKD, EK, EKD) ->
- BinSK = couch_views_encoding:encode(SK, key),
- BinEK = couch_views_encoding:encode(EK, key),
- PackedSK = erlfdb_tuple:pack({BinSK, SKD}),
- PackedEK = erlfdb_tuple:pack({BinEK, EKD}),
- PackedSK =< PackedEK.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
#{
id := DocId,
results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
%% TODO: handle when there is no reduce
io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
- lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+ lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
ExistingKeys, ReduceResult)
- end, lists:zip3(ViewIds, Results, ReduceResults)).
+ end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index cff15b0..51fae9b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -303,6 +303,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+ Id = View#mrview.id_num,
+ [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+ Acc ++ [{Id, ReduceFun}]
+ end, [], Views),
%% First build of the view
if ViewSeq /= <<>> -> ok; true ->
@@ -310,7 +315,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
end,
lists:foreach(fun(Doc) ->
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+ couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
end, Docs),
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
end, #{}, Results),
maps:to_list(ReduceResults);
+reduce(<<"_sum">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Sum} = Acc,
+ Acc#{Key := Val + Sum};
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
% this isn't a real supported reduce function in CouchDB
% But I want a basic reduce function that when we need to update the index
% we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
-export([
fold_level0/8,
create_skip_list/3,
- update_reduce_idx/6
+ update_reduce_idx/7
]).
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
GroupKey = group_level_key(Key, GroupLevel),
{GroupKey, Val};
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+ Sum = lists:foldl(fun ({_, Val}, Acc) ->
+ Val + Acc
+ end, 0, Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Sum};
+
rereduce(<<"_count">>, Rows, GroupLevel) ->
Val = length(Rows),
{Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
group_level_key(_Key, 0) ->
null;
+group_level_key(Key, group_true) ->
+ Key;
+
group_level_key(Key, GroupLevel) when is_list(Key) ->
lists:sublist(Key, GroupLevel);
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
#{
db_prefix := DbPrefix
} = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
ViewOpts = #{
db_prefix => DbPrefix,
sig => Sig,
- view_id => ViewId
+ view_id => ViewId,
+ reducer => Reducer
},
lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
#{
db_prefix := DbPrefix,
sig := Sig,
- view_id := ViewId
+ view_id := ViewId,
+ reducer := Reducer
} = ViewOpts,
- Levels = lists:seq(0, MaxLevel),
+ Levels = lists:seq(1, MaxLevel),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
KeyHash = hash_key(Key),
fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+ not_found ->
+ Val;
+ ExistingVal ->
+ {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+ ReducedVal
+ end,
+ io:format("VAL1 ~p ~n", [Val1]),
+ add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
io:format("Adding at ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
false ->
%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+ add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ EK = create_key(ReduceIdxPrefix, Level, Key),
+ io:format("FF ~p ~n", [Key]),
+ Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+ not_found ->
+ not_found;
+ PackedValue ->
+ io:format("HERE ~p ~n", [PackedValue]),
+ {_, Value} = get_key_value(PackedValue),
+ Value
+ end,
+ io:format("GETTING ~p ~p ~n", [Key, Out]),
+ Out.
+
get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
#{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
}
end
-# test "group=true count reduce with limit", context do
+ # test "group=true count reduce with limit", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true,
+ # :limit => 3
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "dates")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2017, 3, 1], value: 1]},
+ # {:row, [key: [2017, 4, 1], value: 1]},
+ # {:row, [key: [2017, 4, 15], value: 1]}
+ # ]
+ # end
+
+# test "group_level=1 count reduce", context do
# args = %{
# :reduce => true,
-# :group => true,
-# :limit => 3
+# :group_level => 1
# }
#
-# {:ok, res} = run_query(context, args, "dates")
+# {:ok, res} = run_query(context, args, "dates_count")
# IO.inspect(res, label: "OUT")
#
# assert res == [
-# {:row, [key: [2017, 3, 1], value: 1]},
-# {:row, [key: [2017, 4, 1], value: 1]},
-# {:row, [key: [2017, 4, 15], value: 1]}
+# {:row, [key: [2017], value: 4]},
+# {:row, [key: [2018], value: 3]},
+# {:row, [key: [2019], value: 2]}
# ]
# end
- test "group_level=1 count reduce", context do
- args = %{
- :reduce => true,
- :group_level => 1,
- }
+ test "group_level=1 reduce reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 1
+ }
- {:ok, res} = run_query(context, args, "dates")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: [2017], value: 4]},
- {:row, [key: [2018], value: 3]},
- {:row, [key: [2019], value: 2]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 31]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 17]}
+ ]
end
# test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
defp create_docs() do
dates = [
- [2017, 3, 1],
- [2017, 4, 1],
+ {[2017, 3, 1], 9},
+ {[2017, 4, 1], 7},
# out of order check
- [2019, 3, 1],
- [2017, 4, 15],
- [2018, 4, 1],
- [2017, 5, 1],
- [2018, 3, 1],
+ {[2019, 3, 1], 4},
+ {[2017, 4, 15], 6},
+ {[2018, 4, 1], 3},
+ {[2017, 5, 1], 9},
+ {[2018, 3, 1], 6},
# duplicate check
- [2018, 4, 1],
- [2018, 5, 1],
- [2019, 4, 1]
+ {[2018, 4, 1], 4},
+ {[2018, 5, 1], 7},
+ {[2019, 4, 1], 6},
+ {[2019, 5, 1], 7}
]
- for i <- 1..10 do
+ for i <- 1..11 do
group =
if rem(i, 3) == 0 do
"first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj({[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group},
- {"date", Enum.at(dates, i - 1)}
- ]})
+ {date_key, date_val} = Enum.at(dates, i - 1)
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", date_key},
+ {"date_val", date_val}
+ ]}
+ )
end
end
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
- {"dates",
+ # {"dates_count",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.date, doc.value);
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
+ {"dates_sum",
{[
{"map",
"""
function(doc) {
- emit(doc.date, doc.value);
- }
+ emit(doc.date, doc.date_val);
+ }
"""},
- {"reduce", "_count"}
+ {"reduce", "_sum"}
]}}
-# {"baz",
-# {[
-# {"map",
-# """
-# function(doc) {
-# emit(doc.value, doc.value);
-# emit(doc.value, doc.value);
-# emit([doc.value, 1], doc.value);
-# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-# if (doc.value === 3) {
-# emit([1, 1, 5], 1);
-# emit([doc.value, 1, 5], 1);
-# }
-# }
-# """},
-# {"reduce", "_count"}
-# ]}}
+ # {"baz",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.value, doc.value);
+ # emit(doc.value, doc.value);
+ # emit([doc.value, 1], doc.value);
+ # emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ #
+ # if (doc.value === 3) {
+ # emit([1, 1, 5], 1);
+ # emit([doc.value, 1, 5], 1);
+ # }
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
# {"boom",
# {[
# {"map",
[couchdb] 07/08: basic skiplist query working
Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f01b653e1f8fc0f8be8837e5217ad0f5b9e8fdad
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Dec 5 15:42:52 2019 +0200
basic skiplist query working
---
src/couch_views/src/couch_views_indexer.erl | 2 +-
src/couch_views/src/couch_views_reduce.erl | 117 +++----
src/couch_views/src/couch_views_reduce_fdb.erl | 355 ++++++++++++++++-----
src/couch_views/src/couch_views_reducer.erl | 119 +++++++
.../test/exunit/couch_views_reduce_test.exs | 49 ++-
5 files changed, 447 insertions(+), 195 deletions(-)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 51fae9b..096d838 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -311,7 +311,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
%% First build of the view
if ViewSeq /= <<>> -> ok; true ->
- couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+ couch_views_reduce_fdb:create_reduce_indexes(TxDb, Sig, ViewIds)
end,
lists:foreach(fun(Doc) ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index a2e3a93..0e837e3 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,7 @@
-export([
run_reduce/2,
- read_reduce/7,
- setup_reduce_indexes/3
+ read_reduce/7
]).
@@ -36,7 +35,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
} = Db,
%% Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ ReduceIdxPrefix = couch_views_reduce_fdb:reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
#mrargs{
limit = Limit,
group = Group,
@@ -48,7 +47,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
_ -> GroupLevel
end,
- Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+%% Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -66,7 +65,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
},
Fun = fun handle_row/3,
- Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+%% Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+
+ SkipListOpts = args_to_skiplist_opts(Args),
+ Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
#{
user_acc := UserAcc1
} = Acc1,
@@ -76,6 +78,30 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
{ok, Out}
end.
+args_to_skiplist_opts(#mrargs{} = Args) ->
+ #mrargs{
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+
+ StartKey1 = case StartKey of
+ undefined ->
+ [0];
+ StartKey ->
+ StartKey
+ end,
+
+ EndKey1 = case EndKey of
+ undefined ->
+ throw(no_end_key_not_working_yet_error);
+ EndKey ->
+ EndKey
+ end,
+ #{
+ startkey => StartKey1,
+ endkey => EndKey1
+ }.
+
args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
#mrargs{
@@ -85,10 +111,10 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
StartKey1 = case StartKey of
undefined ->
- StartKey0 = encode_key(0, 0, ReduceIdxPrefix),
+ StartKey0 = couch_views_reduce_fdb:create_key(0, 0, ReduceIdxPrefix),
erlfdb_key:first_greater_than(StartKey0);
StartKey ->
- StartKey0 = encode_key(StartKey, 0, ReduceIdxPrefix),
+ StartKey0 = couch_views_reduce_fdb:create_key(StartKey, 0, ReduceIdxPrefix),
erlfdb_key:first_greater_or_equal(StartKey0)
end,
@@ -99,17 +125,12 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
EndKey0;
EndKey ->
io:format("ENDKEY ~n"),
- EndKey0 = encode_key(EndKey, 0, ReduceIdxPrefix),
+ EndKey0 = couch_views_reduce_fdb:create_key(EndKey, 0, ReduceIdxPrefix),
erlfdb_key:first_greater_than(EndKey0)
end,
[{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
-encode_key(Key, Level, ReduceIdxPrefix) ->
- EK = {Level, couch_views_encoding:encode(Key, key)},
- erlfdb_tuple:pack(EK, ReduceIdxPrefix).
-
-
handle_row(Key, Value, Acc) ->
#{
callback := UserCallback,
@@ -118,6 +139,7 @@ handle_row(Key, Value, Acc) ->
limit := Limit
} = Acc,
+ io:format("WOO ROW ~p ~p ~n", [Key, Value]),
Row = [
{key, Key},
{value, Value}
@@ -140,23 +162,6 @@ handle_row(Key, Value, Acc) ->
maybe_stop({ok, Acc}) -> Acc;
maybe_stop({stop, Acc}) -> throw({done, Acc}).
-setup_reduce_indexes(Db, Sig, ViewIds) ->
- #{
- db_prefix := DbPrefix
- } = Db,
-
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- lists:foreach(fun (ViewId) ->
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId
- },
- couch_views_reduce_fdb:create_skip_list(TxDb,
- ?MAX_SKIP_LIST_LEVELS, ViewOpts)
- end, ViewIds)
- end).
-
run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
ReduceFuns = lists:map(fun(View) ->
@@ -175,7 +180,7 @@ run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
} = MappedResult,
ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
- reduce(ReduceFun, Result)
+ couch_views_reducer:reduce(ReduceFun, Result)
end, lists:zip(ReduceFuns, Results)),
MappedResult#{
@@ -184,59 +189,9 @@ run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
end, MappedResults).
-reduce(<<"_count">>, Results) ->
- ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
- case maps:is_key(Key, Acc) of
- true ->
- #{Key := Val} = Acc,
- Acc#{Key := Val + 1};
- false ->
- Acc#{Key => 1}
- end
- end, #{}, Results),
- maps:to_list(ReduceResults);
-
-reduce(<<"_sum">>, Results) ->
- ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
- case maps:is_key(Key, Acc) of
- true ->
- #{Key := Sum} = Acc,
- Acc#{Key := Val + Sum};
- false ->
- Acc#{Key => Val}
- end
- end, #{}, Results),
- maps:to_list(ReduceResults);
-
-% this isn't a real supported reduce function in CouchDB
-% But I want a basic reduce function that when we need to update the index
-% we would need to re-read multiple rows instead of being able to do an
-% atomic update
-reduce(<<"_stats">>, Results) ->
- ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
- io:format("MAX ~p ~p ~n", [Key, Val]),
- case maps:is_key(Key, Acc) of
- true ->
- #{Key := Max} = Acc,
- case Max >= Val of
- true ->
- Acc;
- false ->
- Acc#{Key := Val}
- end;
- false ->
- Acc#{Key => Val}
- end
- end, #{}, Results),
- maps:to_list(ReduceResults).
-
-
is_builtin(<<"_", _/binary>>) ->
true;
is_builtin(_) ->
false.
-reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
- Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
- erlfdb_tuple:pack(Key, DbPrefix).
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 77514ed..72e05b8 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,9 +15,13 @@
-export([
+ fold_skip_list/8,
fold_level0/8,
+ create_reduce_indexes/3,
create_skip_list/3,
- update_reduce_idx/7
+ update_reduce_idx/7,
+ reduce_skip_list_idx_prefix/3,
+ create_key/3
]).
@@ -26,9 +30,11 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric2.hrl").
--define(MAX_SKIP_LIST_LEVELS, 1).
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
-define(LEVEL_FAN_POW, 1).
+
log_levels(Db, Sig, ViewId) ->
#{
db_prefix := DbPrefix
@@ -55,8 +61,8 @@ log_levels(Db, Sig, ViewId) ->
false ->
Total = sum_rows(Rows),
if Total == Level0Total -> Level0Total; true ->
- io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
-%% throw(level_total_error)
+ io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total]),
+ throw(level_total_error)
end
end
@@ -69,18 +75,234 @@ sum_rows(Rows) ->
end, 0, Rows).
+fold_skip_list(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+%% timer:exit_after(40, boom),
+
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+ Acc = #{
+ sig => Sig,
+ view_id => ViewId,
+ user_acc => UserAcc0,
+ callback => UserCallback,
+ reduce_idx_prefix => ReduceIdxPrefix,
+ reducer => Reducer,
+ group_level => GroupLevel,
+ rows => []
+ },
+ #{
+ startkey := StartKey,
+ endkey := EndKey
+ } = Opts,
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ log_levels(TxDb, Sig, ViewId),
+ Acc1 = traverse_skip_list(TxDb, 0, StartKey, EndKey, Acc),
+ #{
+ user_acc := UserAcc1,
+ rows := Rows1
+ } = Acc1,
+ rereduce_and_reply(Reducer, Rows1, GroupLevel, UserCallback, UserAcc1)
+ end).
+
+traverse_skip_list(_TxDb, Level, _CurrentKey, _EndKey, _Acc) when Level < 0 ->
+ throw(skip_list_gone_to_low);
+
+traverse_skip_list(TxDb, _Level, CurrentKey, EndKey, Acc) ->
+ #{
+ user_acc := UserAcc,
+ callback := UserCallback,
+ reduce_idx_prefix := ReduceIdxPrefix,
+ reducer := Reducer,
+ group_level := GroupLevel,
+ rows := Rows
+ } = Acc,
+
+ {RangeLevel, RangeStart, RangeEnd} = get_next_range_and_level(TxDb,
+ ReduceIdxPrefix, GroupLevel, CurrentKey, EndKey),
+ io:format("TRAVERSE Level ~p RangeStart ~p RangeEnd ~p ~n", [RangeLevel, RangeStart, RangeEnd]),
+ Results = get_range_inclusive(TxDb, RangeStart, RangeEnd, RangeLevel, ReduceIdxPrefix),
+ io:format("RESULTS ~p ~n", [Results]),
+
+ NextStart = if Results == [] -> null; true ->
+ {LastKey, _} = lists:last(Results),
+ LastKey
+ end,
+ KeyAfterStart = get_key_after(TxDb, NextStart, EndKey, RangeLevel, ReduceIdxPrefix),
+ io:format("NEXTStart ~p KeyAfter ~p RangeLevel ~p ~n", [NextStart, KeyAfterStart, RangeLevel]),
+
+ {NextStart1, Acc1} = case couch_views_reducer:group_level_equal(NextStart, KeyAfterStart, GroupLevel) of
+ true ->
+ AccNext = Acc#{rows := Rows ++ Results},
+ {NextStart, AccNext};
+ false when RangeLevel == 0 ->
+ AllResults = Rows ++ Results,
+ UserAcc1 = rereduce_and_reply(Reducer, AllResults, GroupLevel,
+ UserCallback, UserAcc),
+ AccNext = Acc#{
+ user_acc := UserAcc1,
+ rows := []
+ },
+ {KeyAfterStart, AccNext};
+ % Need to traverse at level 0 to make sure we have all keys for
+ % the current group_level keys
+ false ->
+ UsableResults = lists:sublist(Results, length(Results) - 1),
+ io:format("USABLE ~p LEVEL ~p ~n", [UsableResults, RangeLevel]),
+ AccNext = Acc#{rows := Rows ++ UsableResults},
+ {NextStart, AccNext}
+ end,
+
+ case RangeEnd == EndKey orelse NextStart1 == null of
+ true when RangeLevel == 0 ->
+ io:format("FINISED ~n"),
+ Acc1;
+ _ ->
+ traverse_skip_list(TxDb, 0, NextStart1, EndKey, Acc1)
+ end.
+
+
+
+get_next_range_and_level(TxDb, ReduceIdxPrefix, GroupLevel, StartKey, EndKey) ->
+ GroupEndKey = get_group_level_endkey(TxDb, GroupLevel, 0, StartKey, ReduceIdxPrefix),
+ % Do not exceed the set endkey
+ GroupEndKey1 = if GroupEndKey < EndKey -> GroupEndKey; true -> EndKey end,
+
+ LevelRanges = [{0, StartKey, GroupEndKey1}],
+ io:format("Get Range StartKey ~p GroupEndKey ~p EndKey ~p ~n", [StartKey, GroupEndKey1, EndKey]),
+ LevelRanges1 = scan_for_level_ranges(TxDb, 0, GroupLevel, StartKey, GroupEndKey1, ReduceIdxPrefix, LevelRanges),
+ lists:last(LevelRanges1).
+
+
+% at end of this specific grouplevel, so have to do final scan at level 0
+scan_for_level_ranges(_TxDb, _Level, _GroupLevel, StartKey, StartKey, _ReduceIdxPrefix, _Acc) ->
+ [{0, StartKey, StartKey}];
+
+scan_for_level_ranges(_TxDb, ?MAX_SKIP_LIST_LEVELS, _GroupLevel, StartKey, StartKey, _ReduceIdxPrefix, Acc) ->
+ Acc;
+
+scan_for_level_ranges(TxDb, Level, GroupLevel, StartKey, EndKey, ReduceIdxPrefix, Acc) ->
+ NextLevel = Level + 1,
+ NearestKey = get_key_or_nearest(TxDb, NextLevel, StartKey, EndKey, ReduceIdxPrefix),
+ io:format("SCAN startkey ~p nearest ~p ~n", [StartKey, NearestKey]),
+ case StartKey =:= NearestKey of
+ true ->
+ GroupLevelEndKey = get_group_level_endkey(TxDb, GroupLevel,
+ NextLevel, StartKey, ReduceIdxPrefix),
+
+ ToFar = GroupLevelEndKey > EndKey,
+ EndOfLevel = GroupLevelEndKey == NearestKey,
+
+ case ToFar orelse EndOfLevel of
+ true ->
+ Acc;
+ false ->
+ Acc1 = Acc ++ [{NextLevel, StartKey, GroupLevelEndKey}],
+ scan_for_level_ranges(TxDb, NextLevel, GroupLevel, StartKey,
+ EndKey, ReduceIdxPrefix, Acc1)
+ end;
+ false ->
+ case couch_views_reducer:group_level_equal(StartKey, NearestKey,
+ GroupLevel) of
+ true ->
+ [{Level, StartKey, NearestKey}];
+ false ->
+ Acc
+ end
+ end.
+
+
+get_key_or_nearest(TxDb, Level, StartKey, EndKey, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+ StartKey2 = erlfdb_key:first_greater_or_equal(StartKey1),
+
+ EndKey1 = create_endkey(ReduceIdxPrefix, Level, EndKey),
+ EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+ Future = erlfdb:get_range(Tx, StartKey2, EndKey2, [{limit, 1}]),
+ wait_and_get_key(Future).
+
+
+get_group_level_endkey(TxDb, GroupLevel, Level, StartKey, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ GroupLevelKey = couch_views_reducer:group_level_key(StartKey, GroupLevel),
+ StartKey1 = create_key(ReduceIdxPrefix, Level, GroupLevelKey),
+ StartKey2 = erlfdb_key:first_greater_than(StartKey1),
+ EndKey = create_endkey(ReduceIdxPrefix, Level, GroupLevelKey),
+ EndKey1 = erlfdb_key:first_greater_or_equal(EndKey),
+ Future = erlfdb:get_range(Tx, StartKey2, EndKey1, [{reverse, true}, {limit, 1}]),
+ wait_and_get_key(Future).
+
+
+get_key_after(TxDb, StartKey, EndKey, Level, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+ StartKey2 = erlfdb_key:first_greater_than(StartKey1),
+
+ EndKey1 = create_endkey(ReduceIdxPrefix, Level, EndKey),
+ EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+ Future = erlfdb:get_range(Tx, StartKey2, EndKey2, [{limit, 1}]),
+ wait_and_get_key(Future).
+
+
+wait_and_get_key(Future) ->
+ case erlfdb:wait(Future) of
+ [] ->
+ null;
+ [{_FullEncodedKey, PackedValue}] ->
+ {Key, _} = get_key_value(PackedValue),
+ Key
+ end.
+
+
+get_range_inclusive(TxDb, StartKey, EndKey, Level, ReduceIdxPrefix) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+ StartKey2 = erlfdb_key:first_greater_or_equal(StartKey1),
+
+ EndKey1 = create_key(ReduceIdxPrefix, Level, EndKey),
+ EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+ Fun = fun ({_FullEncodedKey, PackedValue}, Acc0) ->
+ KV = get_key_value(PackedValue),
+ Acc0 ++ [KV]
+ end,
+
+ erlfdb:fold_range(Tx, StartKey2, EndKey2, Fun, [], []).
+
+
+% TODO: This needs a better name
+create_endkey(ReduceIdxPrefix, Level, Key) ->
+ Key1 = if Key /= null -> Key; true -> [] end,
+ EK = couch_views_encoding:encode(Key1 ++ [16#FF], key),
+ LevelKey = {Level, EK},
+ erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
#{
db_prefix := DbPrefix
} = Db,
- Level = 0,
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
Acc = #{
sig => Sig,
view_id => ViewId,
user_acc => UserAcc0,
- %% args := Args,
callback => UserCallback,
reduce_idx_prefix => ReduceIdxPrefix,
reducer => Reducer,
@@ -88,16 +310,15 @@ fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0)
rows => []
},
+ {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+ {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
fabric2_fdb:transactional(Db, fun(TxDb) ->
log_levels(TxDb, Sig, ViewId),
#{
tx := Tx
} = TxDb,
-
- {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
- {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
-
Fun = fun fold_fwd_cb/2,
Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
#{
@@ -126,10 +347,10 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
LastKey0
end,
- GroupLevelKey = group_level_key(Key, GroupLevel),
+ GroupLevelKey = couch_views_reducer:group_level_key(Key, GroupLevel),
GroupKV = [{GroupLevelKey, Val}],
- case group_level_equal(Key, LastKey, GroupLevel) of
+ case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
true ->
Acc#{
rows := Rows ++ GroupKV
@@ -147,58 +368,10 @@ rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
Acc;
rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
- {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+ {ReducedKey, ReducedVal} = couch_views_reducer:rereduce(Reducer, Rows, GroupLevel),
Callback(ReducedKey, ReducedVal, Acc).
-rereduce(_Reducer, [], _GroupLevel) ->
- no_kvs;
-
-rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
- {Key, Val} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Val};
-
-rereduce(<<"_sum">>, Rows, GroupLevel) ->
- Sum = lists:foldl(fun ({_, Val}, Acc) ->
- Val + Acc
- end, 0, Rows),
- {Key, _} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Sum};
-
-rereduce(<<"_count">>, Rows, GroupLevel) ->
- Val = length(Rows),
- {Key, _} = hd(Rows),
- GroupKey = group_level_key(Key, GroupLevel),
- {GroupKey, Val}.
-
-
-group_level_equal(_One, _Two, 0) ->
- true;
-
-group_level_equal(_One, _Two, group_true) ->
- false;
-
-group_level_equal(One, Two, GroupLevel) ->
- GroupOne = group_level_key(One, GroupLevel),
- GroupTwo = group_level_key(Two, GroupLevel),
- GroupOne == GroupTwo.
-
-
-group_level_key(_Key, 0) ->
- null;
-
-group_level_key(Key, group_true) ->
- Key;
-
-group_level_key(Key, GroupLevel) when is_list(Key) ->
- lists:sublist(Key, GroupLevel);
-
-group_level_key(Key, _GroupLevel) ->
- Key.
-
-
reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
erlfdb_tuple:pack(Key, DbPrefix).
@@ -211,23 +384,21 @@ unpack_key_value(EncodedValue) ->
{Key, Val}.
-%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
+create_reduce_indexes(Db, Sig, ViewIds) ->
#{
db_prefix := DbPrefix
- } = TxDb,
-
- ViewOpts = #{
- db_prefix => DbPrefix,
- sig => Sig,
- view_id => ViewId,
- reducer => Reducer
- },
+ } = Db,
- lists:foreach(fun ({Key, Val}) ->
- io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
- add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
- end, ReduceResult).
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:foreach(fun (ViewId) ->
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId
+ },
+ create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+ end, ViewIds)
+ end).
create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
@@ -248,16 +419,33 @@ create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
end).
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ ViewOpts = #{
+ db_prefix => DbPrefix,
+ sig => Sig,
+ view_id => ViewId,
+ reducer => Reducer
+ },
+
+ ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+ lists:foreach(fun ({Key, Val}) ->
+ io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+ add_kv_to_skip_list(TxDb, ReduceIdxPrefix, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+ end, ReduceResult).
+
+
+add_kv_to_skip_list(Db, ReduceIdxPrefix, MaxLevel, #{} = ViewOpts, Key, Val) ->
#{
- db_prefix := DbPrefix,
- sig := Sig,
- view_id := ViewId,
reducer := Reducer
} = ViewOpts,
Levels = lists:seq(1, MaxLevel),
- ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
KeyHash = hash_key(Key),
fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -265,7 +453,7 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
not_found ->
Val;
ExistingVal ->
- {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+ {_, ReducedVal} = couch_views_reducer:rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
ReducedVal
end,
io:format("VAL1 ~p ~n", [Val1]),
@@ -279,9 +467,10 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
io:format("Adding at ~p ~p ~n", [Level, Key]),
add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
false ->
+ {_, NewVal} = couch_views_reducer:rereduce(Reducer, [{PrevKey, PrevVal}, {Key, Val}], 0),
%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+ io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+ add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, NewVal)
end
end, Levels)
end).
diff --git a/src/couch_views/src/couch_views_reducer.erl b/src/couch_views/src/couch_views_reducer.erl
new file mode 100644
index 0000000..a7ac783
--- /dev/null
+++ b/src/couch_views/src/couch_views_reducer.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_views_reducer).
+
+
+-export([
+ reduce/2,
+ rereduce/3,
+ group_level_equal/3,
+ group_level_key/2
+]).
+
+
+reduce(<<"_count">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Val} = Acc,
+ Acc#{Key := Val + 1};
+ false ->
+ Acc#{Key => 1}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+reduce(<<"_sum">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Sum} = Acc,
+ Acc#{Key := Val + Sum};
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ io:format("MAX ~p ~p ~n", [Key, Val]),
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Max} = Acc,
+ case Max >= Val of
+ true ->
+ Acc;
+ false ->
+ Acc#{Key := Val}
+ end;
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+ no_kvs;
+
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+ {Key, Val} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val};
+
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+ Sum = lists:foldl(fun ({_, Val}, Acc) ->
+ Val + Acc
+ end, 0, Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Sum};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+ Val = length(Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+ true;
+
+group_level_equal(_One, _Two, group_true) ->
+ false;
+
+group_level_equal(One, Two, GroupLevel) ->
+ GroupOne = group_level_key(One, GroupLevel),
+ GroupTwo = group_level_key(Two, GroupLevel),
+ GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+ null;
+
+group_level_key(Key, group_true) ->
+ Key;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+ lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+ Key.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index d6dcc60..e2b9d3b 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -46,7 +46,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "dates")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: [2017, 3, 1], value: 1]},
@@ -62,7 +61,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "dates_count")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: [2017], value: 4]},
@@ -74,11 +72,11 @@ defmodule CouchViewsReduceTest do
test "group_level=1 reduce", context do
args = %{
reduce: true,
- group_level: 1
+ group_level: 1,
+ end_key: [2019,5,1]
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 31]},
@@ -97,7 +95,6 @@ defmodule CouchViewsReduceTest do
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 22]},
@@ -114,7 +111,6 @@ defmodule CouchViewsReduceTest do
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 22]},
@@ -132,7 +128,6 @@ defmodule CouchViewsReduceTest do
}
{:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
assert res == [
{:row, [key: [2017], value: 22]},
@@ -141,23 +136,22 @@ defmodule CouchViewsReduceTest do
]
end
- test "group=true reduce with startkey/endkey", context do
- args = %{
- reduce: true,
- group: true,
- start_key: [2018, 5, 1],
- end_key: [2019, 04, 1],
- }
-
- {:ok, res} = run_query(context, args, "dates_sum")
- IO.inspect(res, label: "OUT")
-
- assert res == [
- {:row, [key: [2018, 5, 1], value: 7]},
- {:row, [key: [2019, 3, 1], value: 4]},
- {:row, [key: [2019, 4, 1], value: 6]}
- ]
- end
+# test "group=true reduce with startkey/endkey", context do
+# args = %{
+# reduce: true,
+# group: true,
+# start_key: [2018, 5, 1],
+# end_key: [2019, 04, 1],
+# }
+#
+# {:ok, res} = run_query(context, args, "dates_sum")
+#
+# assert res == [
+# {:row, [key: [2018, 5, 1], value: 7]},
+# {:row, [key: [2019, 3, 1], value: 4]},
+# {:row, [key: [2019, 4, 1], value: 6]}
+# ]
+# end
# test "group=1 count reduce", context do
# args = %{
@@ -167,7 +161,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: 1, value: 2]},
@@ -187,7 +180,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: 1, value: 2]},
@@ -210,7 +202,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "baz")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: 1, value: 2]},
@@ -236,7 +227,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "boom")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: [2019, 1], value: 1]},
@@ -252,7 +242,6 @@ defmodule CouchViewsReduceTest do
# }
#
# {:ok, res} = run_query(context, args, "max")
- # IO.inspect(res, label: "OUT")
#
# assert res == [
# {:row, [key: :null, value: 3]}
@@ -267,7 +256,7 @@ defmodule CouchViewsReduceTest do
end
def default_cb(:complete, acc) do
- IO.inspect(acc, label: "complete")
+ IO.inspect(Enum.reverse(acc), label: "complete")
{:ok, Enum.reverse(acc)}
end