You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/12/05 17:03:43 UTC
[couchdb] 05/08: level 0 _sum working
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6f6bedc710df762bf3cc490ea0b8edbc38b4122f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200
level 0 _sum working
---
src/couch_views/src/couch_views.erl.orig | 159 ---------------------
src/couch_views/src/couch_views_fdb.erl | 8 +-
src/couch_views/src/couch_views_indexer.erl | 7 +-
src/couch_views/src/couch_views_reduce.erl | 12 ++
src/couch_views/src/couch_views_reduce_fdb.erl | 54 ++++++-
.../test/exunit/couch_views_reduce_test.exs | 150 +++++++++++--------
6 files changed, 160 insertions(+), 230 deletions(-)
diff --git a/src/couch_views/src/couch_views.erl.orig b/src/couch_views/src/couch_views.erl.orig
deleted file mode 100644
index 1830076..0000000
--- a/src/couch_views/src/couch_views.erl.orig
+++ /dev/null
@@ -1,159 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_views).
-
--export([
- query/6
-]).
-
-
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-
-query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
- case fabric2_db:is_users_db(Db) of
- true ->
- fabric2_users_db:after_doc_read(DDoc, Db);
- false ->
- ok
- end,
-
- DbName = fabric2_db:name(Db),
- {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
-
- #mrst{
- views = Views
- } = Mrst,
-
- Args1 = to_mrargs(Args0),
- Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
- Args3 = couch_mrview_util:validate_args(Args2),
-
- ok = check_range(Args3),
-
- try
-<<<<<<< HEAD
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- ok = maybe_update_view(TxDb, Mrst, Args3),
- read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
- end)
- catch throw:{build_view, WaitSeq} ->
- couch_views_jobs:build_view(Db, Mrst, WaitSeq),
- read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
-=======
- case is_reduce_view(Args3) of
- true ->
- couch_views_reader:read_reduce(Db, Mrst, ViewName,
- Callback, Acc0, Args3);
- false ->
- couch_views_reader:read(Db, Mrst, ViewName,
- Callback, Acc0, Args3)
- end
- after
- UpdateAfter = Args3#mrargs.update == lazy,
- if UpdateAfter == false -> ok; true ->
- couch_views_jobs:build_view_async(Db, Mrst)
- end
->>>>>>> Initial work
- end.
-
-
-read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- try
- couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
- after
- UpdateAfter = Args#mrargs.update == lazy,
- if UpdateAfter == false -> ok; true ->
- couch_views_jobs:build_view_async(TxDb, Mrst)
- end
- end
- end).
-
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
- ok;
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = lazy}) ->
- ok;
-
-maybe_update_view(TxDb, Mrst, _Args) ->
- DbSeq = fabric2_db:get_update_seq(TxDb),
- ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
- case DbSeq == ViewSeq of
- true -> ok;
- false -> throw({build_view, DbSeq})
- end.
-
-
-is_reduce_view(#mrargs{view_type = ViewType}) ->
- ViewType =:= red;
-is_reduce_view({Reduce, _, _}) ->
- Reduce =:= red.
-
-
-to_mrargs(#mrargs{} = Args) ->
- Args;
-
-to_mrargs(#{} = Args) ->
- Fields = record_info(fields, mrargs),
- Indexes = lists:seq(2, record_info(size, mrargs)),
- LU = lists:zip(Fields, Indexes),
-
- maps:fold(fun(Key, Value, Acc) ->
- Index = fabric2_util:get_value(couch_util:to_existing_atom(Key), LU),
- setelement(Index, Acc, Value)
- end, #mrargs{}, Args).
-
-
-check_range(#mrargs{start_key = undefined}) ->
- ok;
-
-check_range(#mrargs{end_key = undefined}) ->
- ok;
-
-check_range(#mrargs{start_key = K, end_key = K}) ->
- ok;
-
-check_range(Args) ->
- #mrargs{
- direction = Dir,
- start_key = SK,
- start_key_docid = SKD,
- end_key = EK,
- end_key_docid = EKD
- } = Args,
-
- case {Dir, view_cmp(SK, SKD, EK, EKD)} of
- {fwd, false} ->
- throw(check_range_error(<<"true">>));
- {rev, true} ->
- throw(check_range_error(<<"false">>));
- _ ->
- ok
- end.
-
-
-check_range_error(Descending) ->
- {query_parse_error,
- <<"No rows can match your key range, reverse your ",
- "start_key and end_key or set descending=",
- Descending/binary>>}.
-
-
-view_cmp(SK, SKD, EK, EKD) ->
- BinSK = couch_views_encoding:encode(SK, key),
- BinEK = couch_views_encoding:encode(EK, key),
- PackedSK = erlfdb_tuple:pack({BinSK, SKD}),
- PackedEK = erlfdb_tuple:pack({BinEK, EKD}),
- PackedSK =< PackedEK.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
update_kv_size(TxDb, Sig, ViewId, -TotalSize)
end, ExistingViewKeys);
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
#{
id := DocId,
results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
%% TODO: handle when there is no reduce
io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
- lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+ lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
[]
end,
update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
- couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+ couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
ExistingKeys, ReduceResult)
- end, lists:zip3(ViewIds, Results, ReduceResults)).
+ end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
% For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index cff15b0..51fae9b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -303,6 +303,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
} = State,
ViewIds = [View#mrview.id_num || View <- Views],
+ ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+ Id = View#mrview.id_num,
+ [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+ Acc ++ [{Id, ReduceFun}]
+ end, [], Views),
%% First build of the view
if ViewSeq /= <<>> -> ok; true ->
@@ -310,7 +315,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
end,
lists:foreach(fun(Doc) ->
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+ couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
end, Docs),
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
end, #{}, Results),
maps:to_list(ReduceResults);
+reduce(<<"_sum">>, Results) ->
+ ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true ->
+ #{Key := Sum} = Acc,
+ Acc#{Key := Val + Sum};
+ false ->
+ Acc#{Key => Val}
+ end
+ end, #{}, Results),
+ maps:to_list(ReduceResults);
+
% this isn't a real supported reduce function in CouchDB
% But I want a basic reduce function that when we need to update the index
% we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
-export([
fold_level0/8,
create_skip_list/3,
- update_reduce_idx/6
+ update_reduce_idx/7
]).
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
GroupKey = group_level_key(Key, GroupLevel),
{GroupKey, Val};
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+ Sum = lists:foldl(fun ({_, Val}, Acc) ->
+ Val + Acc
+ end, 0, Rows),
+ {Key, _} = hd(Rows),
+ GroupKey = group_level_key(Key, GroupLevel),
+ {GroupKey, Sum};
+
rereduce(<<"_count">>, Rows, GroupLevel) ->
Val = length(Rows),
{Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
group_level_key(_Key, 0) ->
null;
+group_level_key(Key, group_true) ->
+ Key;
+
group_level_key(Key, GroupLevel) when is_list(Key) ->
lists:sublist(Key, GroupLevel);
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
#{
db_prefix := DbPrefix
} = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
ViewOpts = #{
db_prefix => DbPrefix,
sig => Sig,
- view_id => ViewId
+ view_id => ViewId,
+ reducer => Reducer
},
lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
#{
db_prefix := DbPrefix,
sig := Sig,
- view_id := ViewId
+ view_id := ViewId,
+ reducer := Reducer
} = ViewOpts,
- Levels = lists:seq(0, MaxLevel),
+ Levels = lists:seq(1, MaxLevel),
ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
KeyHash = hash_key(Key),
fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+ not_found ->
+ Val;
+ ExistingVal ->
+ {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+ ReducedVal
+ end,
+ io:format("VAL1 ~p ~n", [Val1]),
+ add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
lists:foreach(fun(Level) ->
{PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
case should_add_key_to_level(Level, KeyHash) of
true ->
io:format("Adding at ~p ~p ~n", [Level, Key]),
- add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
false ->
%% {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
%% io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
- add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+ add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
end
end, Levels)
end).
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ EK = create_key(ReduceIdxPrefix, Level, Key),
+ io:format("FF ~p ~n", [Key]),
+ Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+ not_found ->
+ not_found;
+ PackedValue ->
+ io:format("HERE ~p ~n", [PackedValue]),
+ {_, Value} = get_key_value(PackedValue),
+ Value
+ end,
+ io:format("GETTING ~p ~p ~n", [Key, Out]),
+ Out.
+
get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
#{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
}
end
-# test "group=true count reduce with limit", context do
+ # test "group=true count reduce with limit", context do
+ # args = %{
+ # :reduce => true,
+ # :group => true,
+ # :limit => 3
+ # }
+ #
+ # {:ok, res} = run_query(context, args, "dates")
+ # IO.inspect(res, label: "OUT")
+ #
+ # assert res == [
+ # {:row, [key: [2017, 3, 1], value: 1]},
+ # {:row, [key: [2017, 4, 1], value: 1]},
+ # {:row, [key: [2017, 4, 15], value: 1]}
+ # ]
+ # end
+
+# test "group_level=1 count reduce", context do
# args = %{
# :reduce => true,
-# :group => true,
-# :limit => 3
+# :group_level => 1
# }
#
-# {:ok, res} = run_query(context, args, "dates")
+# {:ok, res} = run_query(context, args, "dates_count")
# IO.inspect(res, label: "OUT")
#
# assert res == [
-# {:row, [key: [2017, 3, 1], value: 1]},
-# {:row, [key: [2017, 4, 1], value: 1]},
-# {:row, [key: [2017, 4, 15], value: 1]}
+# {:row, [key: [2017], value: 4]},
+# {:row, [key: [2018], value: 3]},
+# {:row, [key: [2019], value: 2]}
# ]
# end
- test "group_level=1 count reduce", context do
- args = %{
- :reduce => true,
- :group_level => 1,
- }
+ test "group_level=1 reduce reduce", context do
+ args = %{
+ :reduce => true,
+ :group_level => 1
+ }
- {:ok, res} = run_query(context, args, "dates")
- IO.inspect(res, label: "OUT")
+ {:ok, res} = run_query(context, args, "dates_sum")
+ IO.inspect(res, label: "OUT")
- assert res == [
- {:row, [key: [2017], value: 4]},
- {:row, [key: [2018], value: 3]},
- {:row, [key: [2019], value: 2]}
- ]
+ assert res == [
+ {:row, [key: [2017], value: 31]},
+ {:row, [key: [2018], value: 20]},
+ {:row, [key: [2019], value: 17]}
+ ]
end
# test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
defp create_docs() do
dates = [
- [2017, 3, 1],
- [2017, 4, 1],
+ {[2017, 3, 1], 9},
+ {[2017, 4, 1], 7},
# out of order check
- [2019, 3, 1],
- [2017, 4, 15],
- [2018, 4, 1],
- [2017, 5, 1],
- [2018, 3, 1],
+ {[2019, 3, 1], 4},
+ {[2017, 4, 15], 6},
+ {[2018, 4, 1], 3},
+ {[2017, 5, 1], 9},
+ {[2018, 3, 1], 6},
# duplicate check
- [2018, 4, 1],
- [2018, 5, 1],
- [2019, 4, 1]
+ {[2018, 4, 1], 4},
+ {[2018, 5, 1], 7},
+ {[2019, 4, 1], 6},
+ {[2019, 5, 1], 7}
]
- for i <- 1..10 do
+ for i <- 1..11 do
group =
if rem(i, 3) == 0 do
"first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
"second"
end
- :couch_doc.from_json_obj({[
- {"_id", "doc-id-#{i}"},
- {"value", i},
- {"some", "field"},
- {"group", group},
- {"date", Enum.at(dates, i - 1)}
- ]})
+ {date_key, date_val} = Enum.at(dates, i - 1)
+
+ :couch_doc.from_json_obj(
+ {[
+ {"_id", "doc-id-#{i}"},
+ {"value", i},
+ {"some", "field"},
+ {"group", group},
+ {"date", date_key},
+ {"date_val", date_val}
+ ]}
+ )
end
end
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
{"_id", "_design/bar"},
{"views",
{[
- {"dates",
+ # {"dates_count",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.date, doc.value);
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
+ {"dates_sum",
{[
{"map",
"""
function(doc) {
- emit(doc.date, doc.value);
- }
+ emit(doc.date, doc.date_val);
+ }
"""},
- {"reduce", "_count"}
+ {"reduce", "_sum"}
]}}
-# {"baz",
-# {[
-# {"map",
-# """
-# function(doc) {
-# emit(doc.value, doc.value);
-# emit(doc.value, doc.value);
-# emit([doc.value, 1], doc.value);
-# emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-# if (doc.value === 3) {
-# emit([1, 1, 5], 1);
-# emit([doc.value, 1, 5], 1);
-# }
-# }
-# """},
-# {"reduce", "_count"}
-# ]}}
+ # {"baz",
+ # {[
+ # {"map",
+ # """
+ # function(doc) {
+ # emit(doc.value, doc.value);
+ # emit(doc.value, doc.value);
+ # emit([doc.value, 1], doc.value);
+ # emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+ #
+ # if (doc.value === 3) {
+ # emit([1, 1, 5], 1);
+ # emit([doc.value, 1, 5], 1);
+ # }
+ # }
+ # """},
+ # {"reduce", "_count"}
+ # ]}}
# {"boom",
# {[
# {"map",