You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:04 UTC

[couchdb] 05/05: level 0 _sum working

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7ca52023ae5e2c697ae5258277d0aceaea2c00b1
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200

    level 0 _sum working
---
 src/couch_views/src/couch_views_fdb.erl            |   8 +-
 src/couch_views/src/couch_views_indexer.erl        |   7 +-
 src/couch_views/src/couch_views_reduce.erl         |  12 ++
 src/couch_views/src/couch_views_reduce_fdb.erl     |  54 +++++++-
 .../test/exunit/couch_views_reduce_test.exs        | 150 +++++++++++++--------
 5 files changed, 160 insertions(+), 71 deletions(-)

diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
         update_kv_size(TxDb, Sig, ViewId, -TotalSize)
     end, ExistingViewKeys);
 
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
     #{
         id := DocId,
         results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
 
     %% TODO: handle when there is no reduce
     io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
-    lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+    lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 []
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
-        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
             ExistingKeys, ReduceResult)
-    end, lists:zip3(ViewIds, Results, ReduceResults)).
+    end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
 
 
 % For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 3c60743..f01a58b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -226,6 +226,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
     } = State,
 
     ViewIds = [View#mrview.id_num || View <- Views],
+    ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+        Id = View#mrview.id_num,
+        [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+        Acc ++ [{Id, ReduceFun}]
+    end, [], Views),
 
     %%  First build of the view
     if ViewSeq /= <<>> -> ok; true ->
@@ -233,7 +238,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
     end,
 
     lists:foreach(fun(Doc) ->
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+        couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
     end, Docs),
 
     couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
     end, #{}, Results),
     maps:to_list(ReduceResults);
 
+reduce(<<"_sum">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Sum} = Acc,
+                Acc#{Key := Val + Sum};
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
 % this isn't a real supported reduce function in CouchDB
 % But I want a basic reduce function that when we need to update the index
 % we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
 -export([
     fold_level0/8,
     create_skip_list/3,
-    update_reduce_idx/6
+    update_reduce_idx/7
 ]).
 
 
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
     GroupKey = group_level_key(Key, GroupLevel),
     {GroupKey, Val};
 
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+    Sum = lists:foldl(fun ({_, Val}, Acc) ->
+       Val + Acc
+    end, 0, Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Sum};
+
 rereduce(<<"_count">>, Rows, GroupLevel) ->
     Val = length(Rows),
     {Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
 group_level_key(_Key, 0) ->
     null;
 
+group_level_key(Key, group_true) ->
+    Key;
+
 group_level_key(Key, GroupLevel) when is_list(Key) ->
     lists:sublist(Key, GroupLevel);
 
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
 
 
 %% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
     #{
         db_prefix := DbPrefix
     } = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
     ViewOpts = #{
         db_prefix => DbPrefix,
         sig => Sig,
-        view_id => ViewId
+        view_id => ViewId,
+        reducer => Reducer
     },
 
     lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
     #{
         db_prefix := DbPrefix,
         sig := Sig,
-        view_id := ViewId
+        view_id := ViewId,
+        reducer := Reducer
     } = ViewOpts,
 
-    Levels = lists:seq(0, MaxLevel),
+    Levels = lists:seq(1, MaxLevel),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     KeyHash = hash_key(Key),
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
+        Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+            not_found ->
+                Val;
+            ExistingVal ->
+                {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+                ReducedVal
+        end,
+        io:format("VAL1 ~p ~n", [Val1]),
+        add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
         lists:foreach(fun(Level) ->
             {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
             io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
             case should_add_key_to_level(Level, KeyHash) of
                 true ->
                     io:format("Adding at ~p ~p ~n", [Level, Key]),
-                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+                    add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
                 false ->
 %%                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
 %%                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+                    add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
             end
         end, Levels)
     end).
 
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    EK = create_key(ReduceIdxPrefix, Level, Key),
+    io:format("FF ~p ~n", [Key]),
+    Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+        not_found ->
+            not_found;
+        PackedValue ->
+            io:format("HERE ~p ~n", [PackedValue]),
+            {_, Value} = get_key_value(PackedValue),
+            Value
+    end,
+    io:format("GETTING ~p ~p ~n", [Key, Out]),
+    Out.
+
 
 get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
     #{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-#  test "group=true count reduce with limit", context do
+  #  test "group=true count reduce with limit", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group => true,
+  #      :limit => 3
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "dates")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2017, 3, 1], value: 1]},
+  #             {:row, [key: [2017, 4, 1], value: 1]},
+  #             {:row, [key: [2017, 4, 15], value: 1]}
+  #           ]
+  #  end
+
+#  test "group_level=1 count reduce", context do
 #    args = %{
 #      :reduce => true,
-#      :group => true,
-#      :limit => 3
+#      :group_level => 1
 #    }
 #
-#    {:ok, res} = run_query(context, args, "dates")
+#    {:ok, res} = run_query(context, args, "dates_count")
 #    IO.inspect(res, label: "OUT")
 #
 #    assert res == [
-#             {:row, [key: [2017, 3, 1], value: 1]},
-#             {:row, [key: [2017, 4, 1], value: 1]},
-#             {:row, [key: [2017, 4, 15], value: 1]}
+#             {:row, [key: [2017], value: 4]},
+#             {:row, [key: [2018], value: 3]},
+#             {:row, [key: [2019], value: 2]}
 #           ]
 #  end
 
-  test "group_level=1 count reduce", context do
-      args = %{
-          :reduce => true,
-          :group_level => 1,
-      }
+  test "group_level=1 reduce reduce", context do
+    args = %{
+      :reduce => true,
+      :group_level => 1
+    }
 
-      {:ok, res} = run_query(context, args, "dates")
-      IO.inspect(res, label: "OUT")
+    {:ok, res} = run_query(context, args, "dates_sum")
+    IO.inspect(res, label: "OUT")
 
-      assert res == [
-                 {:row, [key: [2017], value: 4]},
-                 {:row, [key: [2018], value: 3]},
-                 {:row, [key: [2019], value: 2]}
-             ]
+    assert res == [
+             {:row, [key: [2017], value: 31]},
+             {:row, [key: [2018], value: 20]},
+             {:row, [key: [2019], value: 17]}
+           ]
   end
 
   #  test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
 
   defp create_docs() do
     dates = [
-      [2017, 3, 1],
-      [2017, 4, 1],
+      {[2017, 3, 1], 9},
+      {[2017, 4, 1], 7},
       # out of order check
-      [2019, 3, 1],
-      [2017, 4, 15],
-      [2018, 4, 1],
-      [2017, 5, 1],
-      [2018, 3, 1],
+      {[2019, 3, 1], 4},
+      {[2017, 4, 15], 6},
+      {[2018, 4, 1], 3},
+      {[2017, 5, 1], 9},
+      {[2018, 3, 1], 6},
       # duplicate check
-      [2018, 4, 1],
-      [2018, 5, 1],
-      [2019, 4, 1]
+      {[2018, 4, 1], 4},
+      {[2018, 5, 1], 7},
+      {[2019, 4, 1], 6},
+      {[2019, 5, 1], 7}
     ]
 
-    for i <- 1..10 do
+    for i <- 1..11 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
           "second"
         end
 
-      :couch_doc.from_json_obj({[
-         {"_id", "doc-id-#{i}"},
-         {"value", i},
-         {"some", "field"},
-         {"group", group},
-         {"date", Enum.at(dates, i - 1)}
-       ]})
+      {date_key, date_val} = Enum.at(dates, i - 1)
+
+      :couch_doc.from_json_obj(
+        {[
+           {"_id", "doc-id-#{i}"},
+           {"value", i},
+           {"some", "field"},
+           {"group", group},
+           {"date", date_key},
+           {"date_val", date_val}
+         ]}
+      )
     end
   end
 
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
        {"_id", "_design/bar"},
        {"views",
         {[
-           {"dates",
+           #           {"dates_count",
+           #            {[
+           #               {"map",
+           #                """
+           #                function(doc) {
+           #                  emit(doc.date, doc.value);
+           #                 }
+           #                """},
+           #               {"reduce", "_count"}
+           #             ]}}
+           {"dates_sum",
             {[
                {"map",
                 """
                 function(doc) {
-                  emit(doc.date, doc.value);
-                 }
+                    emit(doc.date, doc.date_val);
+                }
                 """},
-               {"reduce", "_count"}
+               {"reduce", "_sum"}
              ]}}
-#           {"baz",
-#            {[
-#               {"map",
-#                """
-#                function(doc) {
-#                  emit(doc.value, doc.value);
-#                  emit(doc.value, doc.value);
-#                  emit([doc.value, 1], doc.value);
-#                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-#                  if (doc.value === 3) {
-#                    emit([1, 1, 5], 1);
-#                    emit([doc.value, 1, 5], 1);
-#                  }
-#                 }
-#                """},
-#               {"reduce", "_count"}
-#             ]}}
+           #           {"baz",
+           #            {[
+           #               {"map",
+           #                """
+           #                function(doc) {
+           #                  emit(doc.value, doc.value);
+           #                  emit(doc.value, doc.value);
+           #                  emit([doc.value, 1], doc.value);
+           #                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+           #
+           #                  if (doc.value === 3) {
+           #                    emit([1, 1, 5], 1);
+           #                    emit([doc.value, 1, 5], 1);
+           #                  }
+           #                 }
+           #                """},
+           #               {"reduce", "_count"}
+           #             ]}}
            #             {"boom",
            #              {[
            #                 {"map",