You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/12 15:34:05 UTC

[couchdb] branch prototype/builtin-reduce updated: group=true and group_levels working

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/builtin-reduce by this push:
     new 2cd1db7  group=true and group_levels working
2cd1db7 is described below

commit 2cd1db767a88f02de4595f64780c4d37f8f98456
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Sep 12 17:33:52 2019 +0200

    group=true and group_levels working
---
 src/couch_views/src/couch_views_fdb.erl            | 88 ++++++++++++++++++----
 src/couch_views/src/couch_views_reader.erl         | 39 ++++++++--
 .../test/exunit/couch_views_reduce_test.exs        | 83 ++++++++++++++------
 3 files changed, 166 insertions(+), 44 deletions(-)

diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index d13adc1..7bfcac0 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -53,7 +53,7 @@
 % Type = ?VIEW_KEY | ?VIEW_ROW
 
 % Reduce Range
-%{<db> ?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId, GroupLevel, Key,
+%{<db> ?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId, Key, GroupLevel,
 %   ReduceType, RowType} = Value | UnEncodedKey
 % ReduceType = VIEW_REDUCE_EXACT | ?VIEW_REDUCE_GROUP
 % RowType = ?VIEW_KEY | ?VIEW_ROW
@@ -145,15 +145,31 @@ fold_reduce_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
         db_prefix := DbPrefix
     } = TxDb,
 
+    #{
+        mrargs := MrArgs
+    } = Acc0,
+
+    #mrargs{
+        group_level = GroupLevel,
+        group = Group
+    } = MrArgs,
+
     ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
 
+    Group1 = case {GroupLevel, Group} of
+        {0, true} -> exact;
+        _ -> grouping
+    end,
+
     FoldAcc = #{
         prefix => ReduceIdxPrefix,
         sort_key => undefined,
         docid => undefined,
         callback => Callback,
         acc => Acc0,
-        group_level => undefined,
+        group_level => GroupLevel,
+        group => Group1,
+        prev_exact_key => undefined,
         reduce_type => undefined,
 
         next => key,
@@ -173,13 +189,12 @@ reduce_fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
         prefix := Prefix
     } = Acc,
 
-    {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_KEY} =
+    {SortKey, _RowGroupLevel, ReduceType, ?VIEW_ROW_KEY} =
         erlfdb_tuple:unpack(RowKey, Prefix),
     Acc#{
         next := value,
         key := couch_views_encoding:decode(EncodedOriginalKey),
         sort_key := SortKey,
-        group_level := GroupLevel,
         reduce_type := ReduceType
     };
 
@@ -189,32 +204,74 @@ reduce_fold_fwd({RowKey, EncodedValue}, #{next := value} = Acc) ->
         key := Key,
         sort_key := SortKey,
         group_level := GroupLevel,
+        group := Group,
         reduce_type := ReduceType,
         callback := UserCallback,
-        acc := UserAcc0
+        acc := UserAcc0,
+        prev_exact_key := PrevExactKey
     } = Acc,
 
+
     % We're asserting there that this row is paired
     % correctly with the previous row by relying on
     % a badmatch if any of these values don't match.
-    {GroupLevel, SortKey, ReduceType, ?VIEW_ROW_VALUE} =
+    {SortKey, RowGroupLevel, ReduceType, ?VIEW_ROW_VALUE} =
         erlfdb_tuple:unpack(RowKey, Prefix),
 
     % TODO: Handle more than uint
     Value = ?bin2uint(EncodedValue),
-    io:format("FWD VAL ~p ~p ~p ~n", [Key, GroupLevel, Value]),
-    UserAcc1 = UserCallback(Key, Value, UserAcc0),
+    io:format("FWD VAL ~p ~p ~p ~p ~n", [Key, RowGroupLevel, Value, ReduceType]),
+    io:format("GROUP SETTINGS ~p ~p ~n", [Group, GroupLevel]),
+    UserAcc1 = case should_return_row(PrevExactKey, Key, Group, GroupLevel, RowGroupLevel, ReduceType) of
+        true ->
+            UserCallback(Key, Value, UserAcc0);
+        false ->
+            UserAcc0
+    end,
+
+    PrevExactKey1 = maybe_update_prev_exact_key(PrevExactKey, Key, ReduceType),
 
     Acc#{
         next := key,
         key := undefined,
         sort_key := undefined,
-        group_level := undefined,
         reduce_type := undefined,
-        acc := UserAcc1
+        acc := UserAcc1,
+        prev_exact_key := PrevExactKey1
     }.
 
 
+should_return_row(_PrevExactKey, _CurrentKey, exact, _GroupLevel,
+    _RowGroupLevel, ?VIEW_REDUCE_EXACT) ->
+    true;
+
+should_return_row(_PrevExactKey, _CurrentKey, exact, _GroupLevel,
+    _RowGroupLevel, ?VIEW_REDUCE_GROUP) ->
+    false;
+
+should_return_row(_PrevExactKey, _CurrentKey, _Group, GroupLevel,
+    RowGroupLevel, ?VIEW_REDUCE_EXACT) when RowGroupLevel =< GroupLevel ->
+    true;
+
+should_return_row(PrevExactKey, PrevExactKey, _Group, GroupLevel, GroupLevel,
+    ?VIEW_REDUCE_GROUP) ->
+    false;
+
+should_return_row(_PrevExactKey, _CurrentKey, _Group, GroupLevel, GroupLevel,
+    ?VIEW_REDUCE_GROUP) ->
+    true;
+
+should_return_row(_, _, _, _, _, _) ->
+    false.
+
+
+maybe_update_prev_exact_key(PrevExactKey, _NewKey, ?VIEW_REDUCE_GROUP) ->
+    PrevExactKey;
+
+maybe_update_prev_exact_key(_PrevExactKey, NewKey, ?VIEW_REDUCE_EXACT) ->
+    NewKey.
+
+
 write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     #{
         id := DocId
@@ -449,15 +506,15 @@ update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
 %%
     {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
     ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
-%%    add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+    add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
     add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
 
 
 add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
     lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
-        KK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+        KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
             ReduceType, ?VIEW_ROW_KEY),
-        VK = reduce_idx_key(ReduceIdxPrefix, GroupLevel, Key1,
+        VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
             ReduceType, ?VIEW_ROW_VALUE),
         ok = erlfdb:set(Tx, KK, Key2),
         ok = erlfdb:add(Tx, VK, Val)
@@ -469,9 +526,8 @@ reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
-reduce_idx_key(ReduceIdxPrefix, GroupLevel, ReduceKey, ReduceType, RowType) ->
-    io:format("GROUP LEVEL ~p ~n", [GroupLevel]),
-    Key = {GroupLevel, ReduceKey, ReduceType, RowType},
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+    Key = {ReduceKey, GroupLevel, ReduceType, RowType},
     erlfdb_tuple:pack(Key, ReduceIdxPrefix).
 
 
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 7f1d530..59c648f 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -39,12 +39,18 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
             Meta = get_meta(TxDb, Mrst, ViewId, Args),
             UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
 
+            #mrargs{
+                limit = Limit
+            } = Args,
+
             Acc0 = #{
                 db => TxDb,
                 skip => Args#mrargs.skip,
                 mrargs => undefined,
                 callback => UserCallback,
-                acc => UserAcc1
+                acc => UserAcc1,
+                row_count => 0,
+                limit => Limit
             },
 
             Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
@@ -81,12 +87,17 @@ reduce_mrargs_to_fdb_options(Args) ->
         direction = Direction,
         limit = Limit,
         skip = Skip,
-        group_level = GroupLevel
+        group_level = GroupLevel,
+        group = Group
 %%        inclusive_end = InclusiveEnd
     } = Args,
 
-%%    GroupLevelEnd = [{end_key, couch_views_encoding:encode(GroupLevel + 1, key)}],
-    GroupLevelEnd = [{end_key, GroupLevel + 1}],
+    GroupExact = Group == true andalso GroupLevel == 0,
+
+    GroupLevelEnd = case GroupExact of
+        true -> [];
+        false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+    end,
 
 %%    StartKey1 = if StartKey0 == undefined -> undefined; true ->
 %%        couch_views_encoding:encode(StartKey0, key)
@@ -126,7 +137,8 @@ reduce_mrargs_to_fdb_options(Args) ->
 
     [
         {dir, Direction},
-        {limit, Limit * 2 + Skip * 2},
+%%        {limit, Limit * 2 + Skip * 2},
+%%        {streaming_mode, stream_large}
         {streaming_mode, want_all}
     ] ++ GroupLevelEnd.
 %%    ] ++ StartKeyOpts ++ EndKeyOpts.
@@ -226,9 +238,12 @@ handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
     Acc#{skip := Skip - 1};
 
 handle_reduce_row(Key, Value, Acc) ->
+    io:format("ACC ~p ~n", [Acc]),
     #{
         callback := UserCallback,
-        acc := UserAcc0
+        acc := UserAcc0,
+        row_count := RowCount,
+        limit := Limit
     } = Acc,
 
     Row = [
@@ -236,8 +251,18 @@ handle_reduce_row(Key, Value, Acc) ->
         {value, Value}
     ],
 
+    RowCountNext = RowCount + 1,
+
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
-    Acc#{acc := UserAcc1}.
+    Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+    case RowCountNext == Limit of
+        true ->
+            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+            maybe_stop({stop, UserAcc2});
+        false ->
+            Acc1
+    end.
 
 
 get_view_id(Lang, Args, ViewName, Views) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 5f4242b..48887d0 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,28 +40,28 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-#  test "group=true count reduce", context do
-#        args = %{
-#            :reduce => true,
-#            :group => true,
+  test "group=true count reduce", context do
+        args = %{
+            :reduce => true,
+            :group => true
 #            :limit => 9
-#        }
-#
-#        {:ok, res} = run_query(context, args)
-#        IO.inspect(res, label: "OUT")
-#
-#        assert res == [
-#                   {:row, [key: 1, value: 2]},
-#                   {:row, [key: 2, value: 2]},
-#                   {:row, [key: 3, value: 2]},
-#                   {:row, [key: [1, 1], value: 1]},
-#                   {:row, [key: [1, 2, 6], value: 1]},
-#                   {:row, [key: [2, 1], value: 1]},
-#                   {:row, [key: [2, 3, 6], value: 1]},
-#                   {:row, [key: [3, 1], value: 1]},
-#                   {:row, [key: [3, 4, 5], value: 1]}
-#               ]
-#    end
+        }
+
+        {:ok, res} = run_query(context, args)
+        IO.inspect(res, label: "OUT")
+
+        assert res == [
+                   {:row, [key: 1, value: 2]},
+                   {:row, [key: 2, value: 2]},
+                   {:row, [key: 3, value: 2]},
+                   {:row, [key: [1, 1], value: 1]},
+                   {:row, [key: [1, 2, 6], value: 1]},
+                   {:row, [key: [2, 1], value: 1]},
+                   {:row, [key: [2, 3, 6], value: 1]},
+                   {:row, [key: [3, 1], value: 1]},
+                   {:row, [key: [3, 4, 5], value: 1]}
+               ]
+    end
 
   test "group=1 count reduce", context do
       args = %{
@@ -83,6 +83,47 @@ defmodule CouchViewsReduceTest do
              ]
   end
 
+  test "group=2 count reduce", context do
+      args = %{
+          :reduce => true,
+          :group_level => 2
+#          :limit => 9
+      }
+
+      {:ok, res} = run_query(context, args)
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: 1, value: 2]},
+                 {:row, [key: 2, value: 2]},
+                 {:row, [key: 3, value: 2]},
+                 {:row, [key: [1, 1], value: 1]},
+                 {:row, [key: [1, 2], value: 1]},
+                 {:row, [key: [2, 1], value: 1]},
+                 {:row, [key: [2, 3], value: 1]},
+                 {:row, [key: [3, 1], value: 1]},
+                 {:row, [key: [3, 4], value: 1]}
+             ]
+  end
+
+  test "group=2 count reduce with limit = 3", context do
+      args = %{
+          :reduce => true,
+          :group_level => 2,
+          :limit => 4
+      }
+
+      {:ok, res} = run_query(context, args)
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: 1, value: 2]},
+                 {:row, [key: 2, value: 2]},
+                 {:row, [key: 3, value: 2]},
+                 {:row, [key: [1, 1], value: 1]}
+             ]
+  end
+
   defp run_query(context, args) do
     db = context[:db]
     ddoc = context[:ddoc]