You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/10/31 14:43:03 UTC

[couchdb] 04/05: can do group_level query

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit bff44fb9f7cfc92e21012c705b8cd1a9938fac88
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 14:09:36 2019 +0200

    can do group_level query
---
 src/couch_views/src/couch_views_reader.erl         |   9 +-
 src/couch_views/src/couch_views_reduce.erl         |  15 +-
 src/couch_views/src/couch_views_reduce_fdb.erl     | 175 +++++++++++++++------
 .../test/exunit/couch_views_reduce_test.exs        |  47 +++---
 4 files changed, 168 insertions(+), 78 deletions(-)

diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 394b3cf..e750a94 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -32,7 +32,8 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     } = Mrst,
 
     ViewId = get_view_id(Lang, Args, ViewName, Views),
-    couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+    Reducer = get_view_reducer(Lang, Args, ViewName, Views),
+    couch_views_reduce:read_reduce(Db, Sig, ViewId, Reducer, UserCallback,
         UserAcc0, Args).
 %%    Fun = fun handle_reduce_row/3,
 %%
@@ -243,6 +244,12 @@ get_view_id(Lang, Args, ViewName, Views) ->
         {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
     end.
 
+get_view_reducer(Lang, Args, ViewName, Views) ->
+    case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+        {map, View, _Args} -> throw(no_reduce);
+        View -> couch_mrview_util:extract_view_reduce(View)
+    end.
+
 
 expand_keys_args(#mrargs{keys = undefined} = Args) ->
     [Args];
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index ebd2f47..b7eb18e 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,7 +15,7 @@
 
 -export([
     run_reduce/2,
-    read_reduce/6,
+    read_reduce/7,
     setup_reduce_indexes/3
 ]).
 
@@ -30,7 +30,7 @@
 -define(MAX_SKIP_LIST_LEVELS, 6).
 
 
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
     #{
         db_prefix := DbPrefix
     } = Db,
@@ -38,9 +38,16 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
 %%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     #mrargs{
-        limit = Limit
+        limit = Limit,
+        group = Group,
+        group_level = GroupLevel
     } = Args,
 
+    GroupLevel1 = case Group of
+        true -> group_true;
+        _ -> GroupLevel
+    end,
+
     Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
 
     try
@@ -59,7 +66,7 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
             },
 
             Fun = fun handle_row/3,
-            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
+            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
             #{
                 user_acc := UserAcc1
             } = Acc1,
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 9683265..5759c42 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,7 +15,7 @@
 
 
 -export([
-    fold_level0/6,
+    fold_level0/8,
     create_skip_list/3,
     update_reduce_idx/6
 ]).
@@ -26,7 +26,7 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/include/fabric2.hrl").
 
--define(MAX_SKIP_LIST_LEVELS, 6).
+-define(MAX_SKIP_LIST_LEVELS, 1).
 -define(LEVEL_FAN_POW, 1).
 
 log_levels(Db, Sig, ViewId) ->
@@ -34,34 +34,40 @@ log_levels(Db, Sig, ViewId) ->
         db_prefix := DbPrefix
     } = Db,
 
-    Levels = lists:seq(0, 6),
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     Opts = [{streaming_mode, want_all}],
 
     fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
-        lists:foreach(fun (Level) ->
+        lists:foldl(fun (Level, Level0Total) ->
             {StartKey, EndKey} = erlfdb_tuple:range({Level},
                 ReduceIdxPrefix),
 
-            Acc0 = #{
-                sig => Sig,
-                view_id => ViewId,
-                reduce_idx_prefix => ReduceIdxPrefix,
-                user_acc => [],
-                callback => fun handle_log_levels/3
-            },
-
-            Fun = fun fold_fwd_cb/2,
-            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
-            #{
-                user_acc := Rows
-            } = Acc,
-            io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
-        end, Levels)
+            Future = erlfdb:get_range(Tx, StartKey, EndKey, Opts),
+            Rows = lists:map(fun ({_Key, EV}) ->
+                unpack_key_value(EV)
+            end, erlfdb:wait(Future)),
+
+            io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+            case Level == 0 of
+                true ->
+                    sum_rows(Rows);
+                false ->
+                    Total = sum_rows(Rows),
+                    if Total == Level0Total -> Level0Total; true ->
+                        io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
+%%                        throw(level_total_error)
+                    end
+            end
+
+        end, 0, Levels)
     end).
 
-handle_log_levels(Key, Value, Acc) ->
-    Acc ++ [{Key, Value}].
+sum_rows(Rows) ->
+    lists:foldl(fun ({_, Val}, Sum) ->
+        Val + Sum
+    end, 0, Rows).
+
 
 %%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
 %%    #{
@@ -95,7 +101,7 @@ handle_log_levels(Key, Value, Acc) ->
 %%        {ok, UserAcc1}
 %%    end).
 
-fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
     #{
         db_prefix := DbPrefix
     } = Db,
@@ -108,7 +114,10 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
         user_acc => UserAcc0,
         %%            args := Args,
         callback => UserCallback,
-        reduce_idx_prefix => ReduceIdxPrefix
+        reduce_idx_prefix => ReduceIdxPrefix,
+        reducer => Reducer,
+        group_level => GroupLevel,
+        rows => []
     },
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -118,36 +127,94 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
         } = TxDb,
 
 
-%%        {StartKey, EndKey} = erlfdb_tuple:range({0},
-%%            ReduceIdxPrefix),
         {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
         {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
 
-%%        ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
         Fun = fun fold_fwd_cb/2,
         Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
         #{
-            user_acc := UserAcc1
+            user_acc := UserAcc1,
+            rows := Rows
         } = Acc1,
-        UserAcc1
+
+        rereduce_and_reply(Reducer, Rows, GroupLevel,
+            UserCallback, UserAcc1)
     end).
 
 
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
     #{
-        reduce_idx_prefix := ReduceIdxPrefix,
         callback := Callback,
-        user_acc := UserAcc
+        user_acc := UserAcc,
+        group_level := GroupLevel,
+        rows := Rows,
+        reducer := Reducer
     } = Acc,
 
-    {_Level, _EK}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    {EK, EV1} = erlfdb_tuple:unpack(EV),
-    Key = couch_views_encoding:decode(EK),
-    Val = couch_views_encoding:decode(EV1),
+    {Key, Val} = unpack_key_value(EV),
+
+    LastKey = if Rows == [] -> false; true ->
+        {LastKey0, _} = lists:last(Rows),
+        LastKey0
+    end,
+
+    case group_level_equal(Key, LastKey, GroupLevel) of
+        true ->
+            Acc#{
+                rows := Rows ++ [{Key, Val}]
+            };
+        false ->
+            UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
+                Callback, UserAcc),
+            Acc#{
+                user_acc := UserAcc1,
+                rows := [{Key, Val}]
+            }
+    end.
+
+rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
+    Acc;
+
+rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
+    {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+    Callback(ReducedKey, ReducedVal, Acc).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+    no_kvs;
 
-    UserAcc1 = Callback(Key, Val, UserAcc),
-    Acc#{user_acc := UserAcc1}.
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+    {Key, Val} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Val};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+    Val = length(Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+    true;
+
+group_level_equal(_One, _Two, group_true) ->
+    false;
+
+group_level_equal(One, Two, GroupLevel) ->
+    GroupOne = group_level_key(One, GroupLevel),
+    GroupTwo = group_level_key(Two, GroupLevel),
+    GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+    null;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+    lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+    Key.
 
 
 reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
@@ -155,6 +222,13 @@ reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
+unpack_key_value(EncodedValue) ->
+    {EK, EV1} = erlfdb_tuple:unpack(EncodedValue),
+    Key = couch_views_encoding:decode(EK),
+    Val = couch_views_encoding:decode(EV1),
+    {Key, Val}.
+
+
 %% Inserting
 update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
     #{
@@ -205,15 +279,15 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
         lists:foreach(fun(Level) ->
             {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
-            io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+            io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
             case should_add_key_to_level(Level, KeyHash) of
                 true ->
-                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    io:format("Adding at ~p ~p ~n", [Level, Key]),
                     add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
                 false ->
-                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+%%                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+%%                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
             end
         end, Levels)
     end).
@@ -229,7 +303,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
 
     EK = couch_views_encoding:encode(Key, key),
     StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-    StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+    StartKeySel = erlfdb_key:last_less_than(StartKey),
     EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
 
     Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
@@ -241,6 +315,9 @@ hash_key(Key) ->
     erlang:phash2(Key).
 
 
+should_add_key_to_level(0, _KeyHash) ->
+    true;
+
 should_add_key_to_level(Level, KeyHash) ->
     (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
 %%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -277,11 +354,11 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
     ok = erlfdb:set(Tx, LevelKey, EV).
 
 
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
-    case PrevVal >= Val of
-        true -> {PrevKey, PrevVal};
-        false -> {PrevKey, Val}
-    end.
+%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+%%    case PrevVal >= Val of
+%%        true -> {PrevKey, PrevVal};
+%%        false -> {PrevKey, Val}
+%%    end.
 
 
 
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index c1b35e2..488f3ee 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -60,16 +60,16 @@ defmodule CouchViewsReduceTest do
   test "group_level=1 count reduce", context do
       args = %{
           :reduce => true,
-          :group => true,
+          :group_level => 1,
       }
 
       {:ok, res} = run_query(context, args, "dates")
       IO.inspect(res, label: "OUT")
 
       assert res == [
-                 {:row, [key: [2017], value: 1]},
-                 {:row, [key: [2018], value: 1]},
-                 {:row, [key: [2019], value: 1]}
+                 {:row, [key: [2017], value: 4]},
+                 {:row, [key: [2018], value: 3]},
+                 {:row, [key: [2019], value: 2]}
              ]
   end
 
@@ -221,7 +221,7 @@ defmodule CouchViewsReduceTest do
       [2019, 4, 1]
     ]
 
-    for i <- 1..4 do
+    for i <- 1..10 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -235,7 +235,6 @@ defmodule CouchViewsReduceTest do
          {"some", "field"},
          {"group", group},
          {"date", Enum.at(dates, i - 1)}
-         #           {"timestamp", Enum.at(timestamps, i - 1)}
        ]})
     end
   end
@@ -254,25 +253,25 @@ defmodule CouchViewsReduceTest do
                  }
                 """},
                {"reduce", "_count"}
-             ]}},
-           {"baz",
-            {[
-               {"map",
-                """
-                function(doc) {
-                  emit(doc.value, doc.value);
-                  emit(doc.value, doc.value);
-                  emit([doc.value, 1], doc.value);
-                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
-                  if (doc.value === 3) {
-                    emit([1, 1, 5], 1);
-                    emit([doc.value, 1, 5], 1);
-                  }
-                 }
-                """},
-               {"reduce", "_count"}
              ]}}
+#           {"baz",
+#            {[
+#               {"map",
+#                """
+#                function(doc) {
+#                  emit(doc.value, doc.value);
+#                  emit(doc.value, doc.value);
+#                  emit([doc.value, 1], doc.value);
+#                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+#                  if (doc.value === 3) {
+#                    emit([1, 1, 5], 1);
+#                    emit([doc.value, 1, 5], 1);
+#                  }
+#                 }
+#                """},
+#               {"reduce", "_count"}
+#             ]}}
            #             {"boom",
            #              {[
            #                 {"map",