You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/12/05 17:03:43 UTC

[couchdb] 05/08: level 0 _sum working

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6f6bedc710df762bf3cc490ea0b8edbc38b4122f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200

    level 0 _sum working
---
 src/couch_views/src/couch_views.erl.orig           | 159 ---------------------
 src/couch_views/src/couch_views_fdb.erl            |   8 +-
 src/couch_views/src/couch_views_indexer.erl        |   7 +-
 src/couch_views/src/couch_views_reduce.erl         |  12 ++
 src/couch_views/src/couch_views_reduce_fdb.erl     |  54 ++++++-
 .../test/exunit/couch_views_reduce_test.exs        | 150 +++++++++++--------
 6 files changed, 160 insertions(+), 230 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl.orig b/src/couch_views/src/couch_views.erl.orig
deleted file mode 100644
index 1830076..0000000
--- a/src/couch_views/src/couch_views.erl.orig
+++ /dev/null
@@ -1,159 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_views).
-
--export([
-    query/6
-]).
-
-
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-
-query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
-    case fabric2_db:is_users_db(Db) of
-        true ->
-            fabric2_users_db:after_doc_read(DDoc, Db);
-        false ->
-            ok
-    end,
-
-    DbName = fabric2_db:name(Db),
-    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
-
-    #mrst{
-        views = Views
-    } = Mrst,
-
-    Args1 = to_mrargs(Args0),
-    Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
-    Args3 = couch_mrview_util:validate_args(Args2),
-
-    ok = check_range(Args3),
-
-    try
-<<<<<<< HEAD
-        fabric2_fdb:transactional(Db, fun(TxDb) ->
-            ok = maybe_update_view(TxDb, Mrst, Args3),
-            read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
-        end)
-    catch throw:{build_view, WaitSeq} ->
-        couch_views_jobs:build_view(Db, Mrst, WaitSeq),
-        read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
-=======
-        case is_reduce_view(Args3) of
-            true ->
-                couch_views_reader:read_reduce(Db, Mrst, ViewName,
-                    Callback, Acc0, Args3);
-            false ->
-                couch_views_reader:read(Db, Mrst, ViewName,
-                    Callback, Acc0, Args3)
-        end
-    after
-        UpdateAfter = Args3#mrargs.update == lazy,
-        if UpdateAfter == false -> ok; true ->
-            couch_views_jobs:build_view_async(Db, Mrst)
-        end
->>>>>>> Initial work
-    end.
-
-
-read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        try
-            couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
-        after
-            UpdateAfter = Args#mrargs.update == lazy,
-            if UpdateAfter == false -> ok; true ->
-                couch_views_jobs:build_view_async(TxDb, Mrst)
-            end
-        end
-    end).
-
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
-    ok;
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = lazy}) ->
-    ok;
-
-maybe_update_view(TxDb, Mrst, _Args) ->
-    DbSeq = fabric2_db:get_update_seq(TxDb),
-    ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
-    case DbSeq == ViewSeq of
-        true -> ok;
-        false -> throw({build_view, DbSeq})
-    end.
-
-
-is_reduce_view(#mrargs{view_type = ViewType}) ->
-    ViewType =:= red;
-is_reduce_view({Reduce, _, _}) ->
-    Reduce =:= red.
-
-
-to_mrargs(#mrargs{} = Args) ->
-    Args;
-
-to_mrargs(#{} = Args) ->
-    Fields = record_info(fields, mrargs),
-    Indexes = lists:seq(2, record_info(size, mrargs)),
-    LU = lists:zip(Fields, Indexes),
-
-    maps:fold(fun(Key, Value, Acc) ->
-        Index = fabric2_util:get_value(couch_util:to_existing_atom(Key), LU),
-        setelement(Index, Acc, Value)
-    end, #mrargs{}, Args).
-
-
-check_range(#mrargs{start_key = undefined}) ->
-    ok;
-
-check_range(#mrargs{end_key = undefined}) ->
-    ok;
-
-check_range(#mrargs{start_key = K, end_key = K}) ->
-    ok;
-
-check_range(Args) ->
-    #mrargs{
-        direction = Dir,
-        start_key = SK,
-        start_key_docid = SKD,
-        end_key = EK,
-        end_key_docid = EKD
-    } = Args,
-
-    case {Dir, view_cmp(SK, SKD, EK, EKD)} of
-        {fwd, false} ->
-            throw(check_range_error(<<"true">>));
-        {rev, true} ->
-            throw(check_range_error(<<"false">>));
-        _ ->
-            ok
-    end.
-
-
-check_range_error(Descending) ->
-    {query_parse_error,
-        <<"No rows can match your key range, reverse your ",
-            "start_key and end_key or set descending=",
-            Descending/binary>>}.
-
-
-view_cmp(SK, SKD, EK, EKD) ->
-    BinSK = couch_views_encoding:encode(SK, key),
-    BinEK = couch_views_encoding:encode(EK, key),
-    PackedSK = erlfdb_tuple:pack({BinSK, SKD}),
-    PackedEK = erlfdb_tuple:pack({BinEK, EKD}),
-    PackedSK =< PackedEK.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
         update_kv_size(TxDb, Sig, ViewId, -TotalSize)
     end, ExistingViewKeys);
 
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
     #{
         id := DocId,
         results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
 
     %% TODO: handle when there is no reduce
     io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
-    lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+    lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 []
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
-        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
             ExistingKeys, ReduceResult)
-    end, lists:zip3(ViewIds, Results, ReduceResults)).
+    end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
 
 
 % For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index cff15b0..51fae9b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -303,6 +303,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
     } = State,
 
     ViewIds = [View#mrview.id_num || View <- Views],
+    ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+        Id = View#mrview.id_num,
+        [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+        Acc ++ [{Id, ReduceFun}]
+    end, [], Views),
 
     %%  First build of the view
     if ViewSeq /= <<>> -> ok; true ->
@@ -310,7 +315,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
     end,
 
     lists:foreach(fun(Doc) ->
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+        couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
     end, Docs),
 
     couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
     end, #{}, Results),
     maps:to_list(ReduceResults);
 
+reduce(<<"_sum">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Sum} = Acc,
+                Acc#{Key := Val + Sum};
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
 % this isn't a real supported reduce function in CouchDB
 % But I want a basic reduce function that when we need to update the index
 % we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
 -export([
     fold_level0/8,
     create_skip_list/3,
-    update_reduce_idx/6
+    update_reduce_idx/7
 ]).
 
 
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
     GroupKey = group_level_key(Key, GroupLevel),
     {GroupKey, Val};
 
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+    Sum = lists:foldl(fun ({_, Val}, Acc) ->
+       Val + Acc
+    end, 0, Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Sum};
+
 rereduce(<<"_count">>, Rows, GroupLevel) ->
     Val = length(Rows),
     {Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
 group_level_key(_Key, 0) ->
     null;
 
+group_level_key(Key, group_true) ->
+    Key;
+
 group_level_key(Key, GroupLevel) when is_list(Key) ->
     lists:sublist(Key, GroupLevel);
 
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
 
 
 %% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
     #{
         db_prefix := DbPrefix
     } = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
     ViewOpts = #{
         db_prefix => DbPrefix,
         sig => Sig,
-        view_id => ViewId
+        view_id => ViewId,
+        reducer => Reducer
     },
 
     lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
     #{
         db_prefix := DbPrefix,
         sig := Sig,
-        view_id := ViewId
+        view_id := ViewId,
+        reducer := Reducer
     } = ViewOpts,
 
-    Levels = lists:seq(0, MaxLevel),
+    Levels = lists:seq(1, MaxLevel),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     KeyHash = hash_key(Key),
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
+        Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+            not_found ->
+                Val;
+            ExistingVal ->
+                {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+                ReducedVal
+        end,
+        io:format("VAL1 ~p ~n", [Val1]),
+        add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
         lists:foreach(fun(Level) ->
             {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
             io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
             case should_add_key_to_level(Level, KeyHash) of
                 true ->
                     io:format("Adding at ~p ~p ~n", [Level, Key]),
-                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+                    add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
                 false ->
 %%                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
 %%                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+                    add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
             end
         end, Levels)
     end).
 
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    EK = create_key(ReduceIdxPrefix, Level, Key),
+    io:format("FF ~p ~n", [Key]),
+    Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+        not_found ->
+            not_found;
+        PackedValue ->
+            io:format("HERE ~p ~n", [PackedValue]),
+            {_, Value} = get_key_value(PackedValue),
+            Value
+    end,
+    io:format("GETTING ~p ~p ~n", [Key, Out]),
+    Out.
+
 
 get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
     #{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-#  test "group=true count reduce with limit", context do
+  #  test "group=true count reduce with limit", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group => true,
+  #      :limit => 3
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "dates")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2017, 3, 1], value: 1]},
+  #             {:row, [key: [2017, 4, 1], value: 1]},
+  #             {:row, [key: [2017, 4, 15], value: 1]}
+  #           ]
+  #  end
+
+#  test "group_level=1 count reduce", context do
 #    args = %{
 #      :reduce => true,
-#      :group => true,
-#      :limit => 3
+#      :group_level => 1
 #    }
 #
-#    {:ok, res} = run_query(context, args, "dates")
+#    {:ok, res} = run_query(context, args, "dates_count")
 #    IO.inspect(res, label: "OUT")
 #
 #    assert res == [
-#             {:row, [key: [2017, 3, 1], value: 1]},
-#             {:row, [key: [2017, 4, 1], value: 1]},
-#             {:row, [key: [2017, 4, 15], value: 1]}
+#             {:row, [key: [2017], value: 4]},
+#             {:row, [key: [2018], value: 3]},
+#             {:row, [key: [2019], value: 2]}
 #           ]
 #  end
 
-  test "group_level=1 count reduce", context do
-      args = %{
-          :reduce => true,
-          :group_level => 1,
-      }
+  test "group_level=1 reduce reduce", context do
+    args = %{
+      :reduce => true,
+      :group_level => 1
+    }
 
-      {:ok, res} = run_query(context, args, "dates")
-      IO.inspect(res, label: "OUT")
+    {:ok, res} = run_query(context, args, "dates_sum")
+    IO.inspect(res, label: "OUT")
 
-      assert res == [
-                 {:row, [key: [2017], value: 4]},
-                 {:row, [key: [2018], value: 3]},
-                 {:row, [key: [2019], value: 2]}
-             ]
+    assert res == [
+             {:row, [key: [2017], value: 31]},
+             {:row, [key: [2018], value: 20]},
+             {:row, [key: [2019], value: 17]}
+           ]
   end
 
   #  test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
 
   defp create_docs() do
     dates = [
-      [2017, 3, 1],
-      [2017, 4, 1],
+      {[2017, 3, 1], 9},
+      {[2017, 4, 1], 7},
       # out of order check
-      [2019, 3, 1],
-      [2017, 4, 15],
-      [2018, 4, 1],
-      [2017, 5, 1],
-      [2018, 3, 1],
+      {[2019, 3, 1], 4},
+      {[2017, 4, 15], 6},
+      {[2018, 4, 1], 3},
+      {[2017, 5, 1], 9},
+      {[2018, 3, 1], 6},
       # duplicate check
-      [2018, 4, 1],
-      [2018, 5, 1],
-      [2019, 4, 1]
+      {[2018, 4, 1], 4},
+      {[2018, 5, 1], 7},
+      {[2019, 4, 1], 6},
+      {[2019, 5, 1], 7}
     ]
 
-    for i <- 1..10 do
+    for i <- 1..11 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
           "second"
         end
 
-      :couch_doc.from_json_obj({[
-         {"_id", "doc-id-#{i}"},
-         {"value", i},
-         {"some", "field"},
-         {"group", group},
-         {"date", Enum.at(dates, i - 1)}
-       ]})
+      {date_key, date_val} = Enum.at(dates, i - 1)
+
+      :couch_doc.from_json_obj(
+        {[
+           {"_id", "doc-id-#{i}"},
+           {"value", i},
+           {"some", "field"},
+           {"group", group},
+           {"date", date_key},
+           {"date_val", date_val}
+         ]}
+      )
     end
   end
 
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
        {"_id", "_design/bar"},
        {"views",
         {[
-           {"dates",
+           #           {"dates_count",
+           #            {[
+           #               {"map",
+           #                """
+           #                function(doc) {
+           #                  emit(doc.date, doc.value);
+           #                 }
+           #                """},
+           #               {"reduce", "_count"}
+           #             ]}}
+           {"dates_sum",
             {[
                {"map",
                 """
                 function(doc) {
-                  emit(doc.date, doc.value);
-                 }
+                    emit(doc.date, doc.date_val);
+                }
                 """},
-               {"reduce", "_count"}
+               {"reduce", "_sum"}
              ]}}
-#           {"baz",
-#            {[
-#               {"map",
-#                """
-#                function(doc) {
-#                  emit(doc.value, doc.value);
-#                  emit(doc.value, doc.value);
-#                  emit([doc.value, 1], doc.value);
-#                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-#                  if (doc.value === 3) {
-#                    emit([1, 1, 5], 1);
-#                    emit([doc.value, 1, 5], 1);
-#                  }
-#                 }
-#                """},
-#               {"reduce", "_count"}
-#             ]}}
+           #           {"baz",
+           #            {[
+           #               {"map",
+           #                """
+           #                function(doc) {
+           #                  emit(doc.value, doc.value);
+           #                  emit(doc.value, doc.value);
+           #                  emit([doc.value, 1], doc.value);
+           #                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+           #
+           #                  if (doc.value === 3) {
+           #                    emit([1, 1, 5], 1);
+           #                    emit([doc.value, 1, 5], 1);
+           #                  }
+           #                 }
+           #                """},
+           #               {"reduce", "_count"}
+           #             ]}}
            #             {"boom",
            #              {[
            #                 {"map",