You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:04 UTC
[couchdb] 05/05: level 0 _sum working
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7ca52023ae5e2c697ae5258277d0aceaea2c00b1
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200
level 0 _sum working
---
src/couch_views/src/couch_views_fdb.erl | 8 +-
src/couch_views/src/couch_views_indexer.erl | 7 +-
src/couch_views/src/couch_views_reduce.erl | 12 ++
src/couch_views/src/couch_views_reduce_fdb.erl | 54 +++++++-
.../test/exunit/couch_views_reduce_test.exs | 150 +++++++++++++--------
5 files changed, 160 insertions(+), 71 deletions(-)
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
#{
id := DocId,
results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
%% TODO: handle when there is no reduce
io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
- lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+ lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
ExistingKeys, ReduceResult)
- end, lists:zip3(ViewIds, Results, ReduceResults)).
+ end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 3c60743..f01a58b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -226,6 +226,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+ Id = View#mrview.id_num,
+ [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+ Acc ++ [{Id, ReduceFun}]
+ end, [], Views),
%% First build of the view
if ViewSeq /= <<>> -> ok; true ->
@@ -233,7 +238,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
end,
lists:foreach(fun(Doc) ->
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+ couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
end, Docs),
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
end, #{}, Results),
maps:to_list(ReduceResults);
+reduce(<<"_sum">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Sum} = Acc,
+ Acc#{Key := Val + Sum};
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
% this isn't a real supported reduce function in CouchDB
% But I want a basic reduce function that when we need to update the index
% we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
-export([
fold_level0/8,
create_skip_list/3,
- update_reduce_idx/6
+ update_reduce_idx/7
]).
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
GroupKey = group_level_key(Key, GroupLevel),
{GroupKey, Val};
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+ Sum = lists:foldl(fun ({_, Val}, Acc) ->
+ Val + Acc
+ end, 0, Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Sum};
+
rereduce(<<"_count">>, Rows, GroupLevel) ->
Val = length(Rows),
{Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
group_level_key(_Key, 0) ->
null;
+group_level_key(Key, group_true) ->
+ Key;
+
group_level_key(Key, GroupLevel) when is_list(Key) ->
lists:sublist(Key, GroupLevel);
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
#{
db_prefix := DbPrefix
} = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
ViewOpts = #{
db_prefix => DbPrefix,
sig => Sig,
- view_id => ViewId
+ view_id => ViewId,
+ reducer => Reducer
},
lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
#{
db_prefix := DbPrefix,
sig := Sig,
- view_id := ViewId
+ view_id := ViewId,
+ reducer := Reducer
} = ViewOpts,
- Levels = lists:seq(0, MaxLevel),
+ Levels = lists:seq(1, MaxLevel),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
KeyHash = hash_key(Key),
fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+ not_found ->
+ Val;
+ ExistingVal ->
+ {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+ ReducedVal
+ end,
+ io:format("VAL1 ~p ~n", [Val1]),
+ add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
io:format("Adding at ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
false ->
%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+ add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ EK = create_key(ReduceIdxPrefix, Level, Key),
+ io:format("FF ~p ~n", [Key]),
+ Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+ not_found ->
+ not_found;
+ PackedValue ->
+ io:format("HERE ~p ~n", [PackedValue]),
+ {_, Value} = get_key_value(PackedValue),
+ Value
+ end,
+ io:format("GETTING ~p ~p ~n", [Key, Out]),
+ Out.
+
get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
#{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
}
end
-# test "group=true count reduce with limit", context do
+ # test "group=true count reduce with limit", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true,
+ # :limit => 3
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "dates")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2017, 3, 1], value: 1]},
+ # {:row, [key: [2017, 4, 1], value: 1]},
+ # {:row, [key: [2017, 4, 15], value: 1]}
+ # ]
+ # end
+
+# test "group_level=1 count reduce", context do
# args = %{
# :reduce => true,
-# :group => true,
-# :limit => 3
+# :group_level => 1
# }
#
-# {:ok, res} = run_query(context, args, "dates")
+# {:ok, res} = run_query(context, args, "dates_count")
# IO.inspect(res, label: "OUT")
#
# assert res == [
-# {:row, [key: [2017, 3, 1], value: 1]},
-# {:row, [key: [2017, 4, 1], value: 1]},
-# {:row, [key: [2017, 4, 15], value: 1]}
+# {:row, [key: [2017], value: 4]},
+# {:row, [key: [2018], value: 3]},
+# {:row, [key: [2019], value: 2]}
# ]
# end
- test "group_level=1 count reduce", context do
- args = %{
- :reduce => true,
- :group_level => 1,
- }
+ test "group_level=1 reduce reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 1
+ }
- {:ok, res} = run_query(context, args, "dates")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: [2017], value: 4]},
- {:row, [key: [2018], value: 3]},
- {:row, [key: [2019], value: 2]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 31]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 17]}
+ ]
end
# test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
defp create_docs() do
dates = [
- [2017, 3, 1],
- [2017, 4, 1],
+ {[2017, 3, 1], 9},
+ {[2017, 4, 1], 7},
# out of order check
- [2019, 3, 1],
- [2017, 4, 15],
- [2018, 4, 1],
- [2017, 5, 1],
- [2018, 3, 1],
+ {[2019, 3, 1], 4},
+ {[2017, 4, 15], 6},
+ {[2018, 4, 1], 3},
+ {[2017, 5, 1], 9},
+ {[2018, 3, 1], 6},
# duplicate check
- [2018, 4, 1],
- [2018, 5, 1],
- [2019, 4, 1]
+ {[2018, 4, 1], 4},
+ {[2018, 5, 1], 7},
+ {[2019, 4, 1], 6},
+ {[2019, 5, 1], 7}
]
- for i <- 1..10 do
+ for i <- 1..11 do
group =
if rem(i, 3) == 0 do
"first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj({[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group},
- {"date", Enum.at(dates, i - 1)}
- ]})
+ {date_key, date_val} = Enum.at(dates, i - 1)
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", date_key},
+ {"date_val", date_val}
+ ]}
+ )
end
end
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
- {"dates",
+ # {"dates_count",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.date, doc.value);
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
+ {"dates_sum",
{[
{"map",
"""
function(doc) {
- emit(doc.date, doc.value);
- }
+ emit(doc.date, doc.date_val);
+ }
"""},
- {"reduce", "_count"}
+ {"reduce", "_sum"}
]}}
-# {"baz",
-# {[
-# {"map",
-# """
-# function(doc) {
-# emit(doc.value, doc.value);
-# emit(doc.value, doc.value);
-# emit([doc.value, 1], doc.value);
-# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-# if (doc.value === 3) {
-# emit([1, 1, 5], 1);
-# emit([doc.value, 1, 5], 1);
-# }
-# }
-# """},
-# {"reduce", "_count"}
-# ]}}
+ # {"baz",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.value, doc.value);
+ # emit(doc.value, doc.value);
+ # emit([doc.value, 1], doc.value);
+ # emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ #
+ # if (doc.value === 3) {
+ # emit([1, 1, 5], 1);
+ # emit([doc.value, 1, 5], 1);
+ # }
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
# {"boom",
# {[
# {"map",