You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/12/05 17:03:38 UTC

[couchdb] branch prototype/builtin-reduce updated (7ca5202 -> da6fa09)

This is an automated email from the ASF dual-hosted git repository.

garren pushed a change to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 7ca5202  level 0 _sum working
 discard bff44fb  can do group_level query
 discard 533fdb6  progress with reading level 0
 discard e399fb3  printing
 discard 03cbc41  Initial work
     add 987efb3  add test to prove we can view swap
     add 8d5c107  Use "\xFF/metadataVersion" key for checking metadata
     add 8bb0718  Abandon a view job if the db or ddoc is deleted
     add 583d7fe  Pass contexts to fabric2_db functions
     add 8d28d85  Merge pull request #2279 from cloudant/refactor-user-ctx-handling
     add 3db0ba7  Ensure we can create partitioned design docs with FDB
     add aaae564  Check security properties in the main transaction
     add b71cbe2  Before starting a db transanction, refresh the db handle from the cache
     add 44f660f  Update fabric2_fdb's set_config to take un-encoding values
     add b58dc30  Assert Db handle field existence in `load_config/1` in fabric2_fdb
     add 706acca  Check membership when calling get_security/1 in fabric2_db
     add 2d3737c  Support regexp based blacklist in config
     add c9b8e25  Implement fabric2_server:fdb_cluster/0
     add e93d1b4  Add ctrace application
     add 98bc5ea  Trace http endpoints
     add 4680884  Trace fdb transactions
     add 5e47f50  Implement node types
     add be22ef9  Add operation names for all HTTP endpoints
     add 3c2b92c  Change end-point /_up to check fdb connectivity
     add 0cea6a4  Optimize view read latency when the view ready
     add 2247c80  Retry for failed indexes builds
     new 5736d57  Initial work
     new 88ed4eb  printing
     new 0076f15  progress with reading level 0
     new 0ddb800  can do group_level query
     new 6f6bedc  level 0 _sum working
     new 457cb7e  startkey/endkey with level0
     new f01b653  basic skiplist query working
     new da6fa09  basic weird reduce view support

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7ca5202)
            \
             N -- N -- N   refs/heads/prototype/builtin-reduce (da6fa09)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |   5 +
 rebar.config.script                                |   5 +
 rel/overlay/etc/default.ini                        |  50 ++-
 src/chttpd/src/chttpd.app.src                      |   1 +
 src/chttpd/src/chttpd.erl                          | 118 +++++-
 src/chttpd/src/chttpd_app.erl                      |   4 +-
 src/chttpd/src/chttpd_auth_request.erl             |   5 +-
 src/chttpd/src/chttpd_db.erl                       | 103 ++---
 src/chttpd/src/chttpd_handlers.erl                 |  23 +-
 src/chttpd/src/chttpd_httpd_handlers.erl           | 442 ++++++++++++++++++++-
 src/chttpd/src/chttpd_misc.erl                     |  11 +-
 src/chttpd/src/chttpd_sup.erl                      |  16 +-
 src/couch/src/couch_util.erl                       |  37 +-
 src/couch_jobs/src/couch_jobs.hrl                  |   5 +-
 src/couch_mrview/src/couch_mrview.erl              |  14 +-
 src/couch_views/src/couch_views.erl                |  60 +--
 src/couch_views/src/couch_views_app.erl            |   4 +-
 src/couch_views/src/couch_views_indexer.erl        |  99 ++++-
 src/couch_views/src/couch_views_jobs.erl           |   3 +-
 src/couch_views/src/couch_views_reduce.erl         | 148 +++----
 src/couch_views/src/couch_views_reduce_fdb.erl     | 403 +++++++++++++------
 src/couch_views/src/couch_views_reducer.erl        | 119 ++++++
 src/couch_views/src/couch_views_sup.erl            |  32 +-
 .../test/exunit/couch_views_reduce_test.exs        | 236 +++++------
 src/ctrace/README.md                               | 291 ++++++++++++++
 src/{fabric => ctrace}/rebar.config                |   0
 .../couch_js.app.src => ctrace/src/ctrace.app.src} |  16 +-
 src/ctrace/src/ctrace.erl                          | 361 +++++++++++++++++
 .../eunit.config => src/ctrace/src/ctrace.hrl      |   8 +-
 .../couch_js_app.erl => ctrace/src/ctrace_app.erl} |  11 +-
 src/ctrace/src/ctrace_config.erl                   | 133 +++++++
 src/ctrace/src/ctrace_dsl.erl                      | 106 +++++
 .../couch_js_sup.erl => ctrace/src/ctrace_sup.erl} |  26 +-
 src/ctrace/test/ctrace_config_test.erl             | 153 +++++++
 src/ctrace/test/ctrace_dsl_test.erl                | 123 ++++++
 src/ctrace/test/ctrace_test.erl                    | 412 +++++++++++++++++++
 src/fabric/include/fabric2.hrl                     |  13 +-
 src/fabric/src/fabric.app.src                      |   1 +
 src/fabric/src/fabric2_db.erl                      |  68 ++--
 src/fabric/src/fabric2_fdb.erl                     | 159 +++++---
 src/fabric/src/fabric2_node_types.erl              |  52 +++
 src/fabric/src/fabric2_server.erl                  |  38 +-
 src/fabric/test/fabric2_db_security_tests.erl      | 141 +++----
 src/fabric/test/fabric2_dir_prefix_tests.erl       |   2 +-
 src/fabric/test/fabric2_doc_crud_tests.erl         |  18 +
 src/fabric/test/fabric2_node_types_tests.erl       |  73 ++++
 .../src/global_changes_httpd_handlers.erl          |   8 +-
 src/mango/src/mango_httpd_handlers.erl             |  31 +-
 src/mem3/src/mem3_httpd_handlers.erl               |  38 +-
 src/setup/src/setup_httpd_handlers.erl             |  12 +-
 test/elixir/test/basics_test.exs                   |   6 +
 test/elixir/test/map_test.exs                      |  99 +++++
 52 files changed, 3619 insertions(+), 723 deletions(-)
 create mode 100644 src/couch_views/src/couch_views_reducer.erl
 create mode 100644 src/ctrace/README.md
 copy src/{fabric => ctrace}/rebar.config (100%)
 copy src/{couch_js/src/couch_js.app.src => ctrace/src/ctrace.app.src} (78%)
 create mode 100644 src/ctrace/src/ctrace.erl
 copy rel/files/eunit.config => src/ctrace/src/ctrace.hrl (80%)
 copy src/{couch_js/src/couch_js_app.erl => ctrace/src/ctrace_app.erl} (90%)
 create mode 100644 src/ctrace/src/ctrace_config.erl
 create mode 100644 src/ctrace/src/ctrace_dsl.erl
 copy src/{couch_js/src/couch_js_sup.erl => ctrace/src/ctrace_sup.erl} (71%)
 create mode 100644 src/ctrace/test/ctrace_config_test.erl
 create mode 100644 src/ctrace/test/ctrace_dsl_test.erl
 create mode 100644 src/ctrace/test/ctrace_test.erl
 create mode 100644 src/fabric/src/fabric2_node_types.erl
 create mode 100644 src/fabric/test/fabric2_node_types_tests.erl


[couchdb] 08/08: basic weird reduce view support

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit da6fa0925354d8ca2d95c0bfbcdd3660eb2f1244
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Dec 5 19:02:21 2019 +0200

    basic weird reduce view support
---
 src/couch_views/src/couch_views_reduce.erl         |  20 +-
 src/couch_views/src/couch_views_reduce_fdb.erl     |   3 +-
 src/couch_views/src/couch_views_reducer.erl        |  10 +-
 .../test/exunit/couch_views_reduce_test.exs        | 237 +++++++++------------
 4 files changed, 112 insertions(+), 158 deletions(-)

diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 0e837e3..4eae5b9 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -25,8 +25,8 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
--define(LEVEL_FAN_POW, 1).
--define(MAX_SKIP_LIST_LEVELS, 6).
+%%-define(LEVEL_FAN_POW, 1).
+%%-define(MAX_SKIP_LIST_LEVELS, 6).
 
 
 read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
@@ -34,7 +34,6 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
         db_prefix := DbPrefix
     } = Db,
 
-%%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = couch_views_reduce_fdb:reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     #mrargs{
         limit = Limit,
@@ -47,11 +46,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
         _ -> GroupLevel
     end,
 
-%%    Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+    Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
 
     try
         fabric2_fdb:transactional(Db, fun(TxDb) ->
-    %%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
 
             Acc0 = #{
                 sig => Sig,
@@ -65,10 +63,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
             },
 
             Fun = fun handle_row/3,
-%%            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
 
-            SkipListOpts = args_to_skiplist_opts(Args),
-            Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
+%%            SkipListOpts = args_to_skiplist_opts(Args),
+%%            Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
             #{
                 user_acc := UserAcc1
             } = Acc1,
@@ -111,10 +109,10 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
 
     StartKey1 = case StartKey of
         undefined ->
-            StartKey0 = couch_views_reduce_fdb:create_key(0, 0, ReduceIdxPrefix),
+            StartKey0 = couch_views_reduce_fdb:create_key(ReduceIdxPrefix, 0, 0),
             erlfdb_key:first_greater_than(StartKey0);
         StartKey ->
-            StartKey0 = couch_views_reduce_fdb:create_key(StartKey, 0, ReduceIdxPrefix),
+            StartKey0 = couch_views_reduce_fdb:create_key(ReduceIdxPrefix, 0, StartKey),
             erlfdb_key:first_greater_or_equal(StartKey0)
     end,
 
@@ -125,7 +123,7 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
             EndKey0;
         EndKey ->
             io:format("ENDKEY ~n"),
-            EndKey0 = couch_views_reduce_fdb:create_key(EndKey, 0, ReduceIdxPrefix),
+            EndKey0 = couch_views_reduce_fdb:create_key(ReduceIdxPrefix, 0, EndKey),
             erlfdb_key:first_greater_than(EndKey0)
     end,
     [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 72e05b8..be2ed4f 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -350,7 +350,8 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
     GroupLevelKey = couch_views_reducer:group_level_key(Key, GroupLevel),
     GroupKV = [{GroupLevelKey, Val}],
 
-    case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
+%%    case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
+    case GroupLevelKey == LastKey of
         true ->
             Acc#{
                 rows := Rows ++ GroupKV
diff --git a/src/couch_views/src/couch_views_reducer.erl b/src/couch_views/src/couch_views_reducer.erl
index a7ac783..7a24661 100644
--- a/src/couch_views/src/couch_views_reducer.erl
+++ b/src/couch_views/src/couch_views_reducer.erl
@@ -46,7 +46,6 @@ reduce(<<"_sum">>, Results) ->
     end, #{}, Results),
     maps:to_list(ReduceResults);
 
-
 % this isn't a real supported reduce function in CouchDB
 % But I want a basic reduce function that when we need to update the index
 % we would need to re-read multiple rows instead of being able to do an
@@ -87,10 +86,11 @@ rereduce(<<"_sum">>, Rows, GroupLevel) ->
     {GroupKey, Sum};
 
 rereduce(<<"_count">>, Rows, GroupLevel) ->
-    Val = length(Rows),
-    {Key, _} = hd(Rows),
-    GroupKey = group_level_key(Key, GroupLevel),
-    {GroupKey, Val}.
+    rereduce(<<"_sum">>, Rows, GroupLevel).
+%%    Val = length(Rows),
+%%    {Key, _} = hd(Rows),
+%%    GroupKey = group_level_key(Key, GroupLevel),
+%%    {GroupKey, Val}.
 
 
 group_level_equal(_One, _Two, 0) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index e2b9d3b..49334fd 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -73,7 +73,7 @@ defmodule CouchViewsReduceTest do
     args = %{
       reduce: true,
       group_level: 1,
-      end_key: [2019,5,1]
+      end_key: [2019, 5, 1]
     }
 
     {:ok, res} = run_query(context, args, "dates_sum")
@@ -86,91 +86,92 @@ defmodule CouchViewsReduceTest do
   end
 
   test "group_level=1 reduce with startkey/endkey", context do
-      args = %{
-          reduce: true,
-          group_level: 1,
-          start_key: [2017, 4, 1],
-          end_key: [2018, 3, 1],
-
-      }
+    args = %{
+      reduce: true,
+      group_level: 1,
+      start_key: [2017, 4, 1],
+      end_key: [2018, 3, 1]
+    }
 
-      {:ok, res} = run_query(context, args, "dates_sum")
+    {:ok, res} = run_query(context, args, "dates_sum")
 
-      assert res == [
-                 {:row, [key: [2017], value: 22]},
-                 {:row, [key: [2018], value: 6]}
-             ]
+    assert res == [
+             {:row, [key: [2017], value: 22]},
+             {:row, [key: [2018], value: 6]}
+           ]
   end
 
   test "group_level=1 reduce with startkey/endkey take 2", context do
-      args = %{
-          reduce: true,
-          group_level: 1,
-          start_key: [2017, 4, 1],
-          end_key: [2019, 3, 2],
-      }
-
-      {:ok, res} = run_query(context, args, "dates_sum")
-
-      assert res == [
-                 {:row, [key: [2017], value: 22]},
-                 {:row, [key: [2018], value: 20]},
-                 {:row, [key: [2019], value: 4]}
-             ]
+    args = %{
+      reduce: true,
+      group_level: 1,
+      start_key: [2017, 4, 1],
+      end_key: [2019, 3, 2]
+    }
+
+    {:ok, res} = run_query(context, args, "dates_sum")
+
+    assert res == [
+             {:row, [key: [2017], value: 22]},
+             {:row, [key: [2018], value: 20]},
+             {:row, [key: [2019], value: 4]}
+           ]
   end
 
   test "group_level=1 reduce with startkey/endkey take 3", context do
-      args = %{
-          reduce: true,
-          group_level: 1,
-          start_key: [2017, 4, 1],
-          end_key: [2019, 05, 1],
-      }
-
-      {:ok, res} = run_query(context, args, "dates_sum")
-
-      assert res == [
-                 {:row, [key: [2017], value: 22]},
-                 {:row, [key: [2018], value: 20]},
-                 {:row, [key: [2019], value: 17]}
-             ]
+    args = %{
+      reduce: true,
+      group_level: 1,
+      start_key: [2017, 4, 1],
+      end_key: [2019, 05, 1]
+    }
+
+    {:ok, res} = run_query(context, args, "dates_sum")
+
+    assert res == [
+             {:row, [key: [2017], value: 22]},
+             {:row, [key: [2018], value: 20]},
+             {:row, [key: [2019], value: 17]}
+           ]
   end
 
-#  test "group=true reduce with startkey/endkey", context do
-#      args = %{
-#          reduce: true,
-#          group: true,
-#          start_key: [2018, 5, 1],
-#          end_key: [2019, 04, 1],
-#      }
-#
-#      {:ok, res} = run_query(context, args, "dates_sum")
-#
-#      assert res == [
-#                 {:row, [key: [2018, 5, 1], value: 7]},
-#                 {:row, [key: [2019, 3, 1], value: 4]},
-#                 {:row, [key: [2019, 4, 1], value: 6]}
-#             ]
-#  end
-
-  #  test "group=1 count reduce", context do
-  #    args = %{
-  #      :reduce => true,
-  #      :group_level => 1
-  #      #          :limit => 6
-  #    }
+  #  test "group=true reduce with startkey/endkey", context do
+  #      args = %{
+  #          reduce: true,
+  #          group: true,
+  #          start_key: [2018, 5, 1],
+  #          end_key: [2019, 04, 1],
+  #      }
   #
-  #    {:ok, res} = run_query(context, args, "baz")
+  #      {:ok, res} = run_query(context, args, "dates_sum")
   #
-  #    assert res == [
-  #             {:row, [key: 1, value: 2]},
-  #             {:row, [key: 2, value: 2]},
-  #             {:row, [key: 3, value: 2]},
-  #             {:row, [key: [1], value: 2]},
-  #             {:row, [key: [2], value: 2]},
-  #             {:row, [key: [3], value: 2]}
-  #           ]
+  #      assert res == [
+  #                 {:row, [key: [2018, 5, 1], value: 7]},
+  #                 {:row, [key: [2019, 3, 1], value: 4]},
+  #                 {:row, [key: [2019, 4, 1], value: 6]}
+  #             ]
   #  end
+
+  test "group_level=1 _count reduce", context do
+    args = %{
+      :reduce => true,
+      :group_level => 1,
+      :limit => 6
+    }
+
+    {:ok, res} = run_query(context, args, "baz")
+
+    IO.inspect res, label: "OUT", charlists: :as_lists
+    assert res == [
+             {:row, [key: 1, value: 2]},
+             {:row, [key: 2, value: 2]},
+             {:row, [key: 3, value: 2]},
+             {:row, [key: [1], value: 2]},
+             {:row, [key: [2], value: 2]},
+             {:row, [key: [3], value: 2]}
+           ]
+  end
+
   #
   #  test "group=2 count reduce", context do
   #    args = %{
@@ -256,7 +257,7 @@ defmodule CouchViewsReduceTest do
   end
 
   def default_cb(:complete, acc) do
-    IO.inspect(Enum.reverse(acc), label: "complete")
+    IO.inspect(Enum.reverse(acc), label: "complete", charlists: :as_lists)
     {:ok, Enum.reverse(acc)}
   end
 
@@ -325,16 +326,6 @@ defmodule CouchViewsReduceTest do
        {"_id", "_design/bar"},
        {"views",
         {[
-           #           {"dates_count",
-           #            {[
-           #               {"map",
-           #                """
-           #                function(doc) {
-           #                  emit(doc.date, doc.value);
-           #                 }
-           #                """},
-           #               {"reduce", "_count"}
-           #             ]}}
            {"dates_sum",
             {[
                {"map",
@@ -344,64 +335,28 @@ defmodule CouchViewsReduceTest do
                 }
                 """},
                {"reduce", "_sum"}
+             ]}},
+           {"baz",
+            {[
+               {"map",
+                """
+                function(doc) {
+                  if (doc.value > 3) {
+                    return;
+                  }
+                  emit(doc.value, doc.value);
+                  emit(doc.value, doc.value);
+                  emit([doc.value, 1], doc.value);
+                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+
+                  if (doc.value === 3) {
+                   // emit([1, 1, 5], 1);
+                   // emit([doc.value, 1, 5], 1);
+                  }
+                 }
+                """},
+               {"reduce", "_count"}
              ]}}
-           #           {"baz",
-           #            {[
-           #               {"map",
-           #                """
-           #                function(doc) {
-           #                  emit(doc.value, doc.value);
-           #                  emit(doc.value, doc.value);
-           #                  emit([doc.value, 1], doc.value);
-           #                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-           #
-           #                  if (doc.value === 3) {
-           #                    emit([1, 1, 5], 1);
-           #                    emit([doc.value, 1, 5], 1);
-           #                  }
-           #                 }
-           #                """},
-           #               {"reduce", "_count"}
-           #             ]}}
-           #             {"boom",
-           #              {[
-           #                 {"map",
-           #                  """
-           #                  function(doc) {
-           #                      var month = 1;
-           #                      if (doc.value % 2) {
-           #                          month = 2;
-           #                      }
-           #                      emit([2019, month, doc.value], doc.value);
-           #                  }
-           #                  """},
-           #                 {"reduce", "_count"}
-           #               ]}},
-           #             {"max",
-           #              {[
-           #                 {"map",
-           #                  """
-           #                  function(doc) {
-           #                      //emit(doc.value, doc.value);
-           #                      //emit([doc.value, 1], doc.value);
-           #                      //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-           #                        emit(1, 1);
-           #                        emit(2, 2);
-           #                        emit(3, 3);
-           #                        emit(4, 4);
-           #
-           #                       emit([2019, 2, 2], 1);
-           #                       emit([2019, 3, 3], 2);
-           #                       emit([2019, 3, 3], 3);
-           #                       emit([2019, 4, 3], 4);
-           #                       emit([2019, 5, 3], 6);
-           #                      if (doc.value === 3) {
-           #                       //emit([doc.value, 1, 5], 1);
-           #                      }
-           #                  }
-           #                  """},
-           #                 {"reduce", "_stats"}
-           #               ]}}
          ]}}
      ]})
   end


[couchdb] 01/08: Initial work

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5736d5752c3d7e2b6eef2cdf8480f4bf03462482
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Sep 11 17:01:08 2019 +0200

    Initial work
---
 src/couch_views/include/couch_views.hrl            |   1 +
 src/couch_views/src/couch_views.erl                |  14 +-
 .../src/{couch_views.erl => couch_views.erl.orig}  |  21 +-
 src/couch_views/src/couch_views_fdb.erl            | 116 ++++++-
 src/couch_views/src/couch_views_indexer.erl        |   5 +
 src/couch_views/src/couch_views_reader.erl         | 155 ++++++++-
 src/couch_views/src/couch_views_reduce.erl         | 364 +++++++++++++++++++++
 .../couch_views_reduce_fdb.erl}                    |  29 +-
 .../test/exunit/couch_views_reduce_test.exs        | 286 ++++++++++++++++
 src/couch_views/test/exunit/test_helper.exs        |   2 +
 10 files changed, 967 insertions(+), 26 deletions(-)

diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 2e443eb..e97d777 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -15,6 +15,7 @@
 -define(VIEW_ID_INFO, 1).
 -define(VIEW_ID_RANGE, 2).
 -define(VIEW_MAP_RANGE, 3).
+-define(VIEW_REDUCE_RANGE, 4).
 
 -define(VIEW_ROW_COUNT, 0).
 -define(VIEW_KV_SIZE, 1).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 322415b..5b2c76f 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -38,16 +38,20 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
     Args1 = to_mrargs(Args0),
     Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
     Args3 = couch_mrview_util:validate_args(Args2),
+
     ok = check_range(Args3),
-    case is_reduce_view(Args3) of
-        true -> throw({not_implemented});
-        false -> ok
-    end,
 
     try
         fabric2_fdb:transactional(Db, fun(TxDb) ->
             ok = maybe_update_view(TxDb, Mrst, Args3),
-            read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
+            case is_reduce_view(Args3) of
+                true ->
+                    couch_views_reader:read_reduce(Db, Mrst, ViewName,
+                        Callback, Acc0, Args3);
+                false ->
+                    couch_views_reader:read(Db, Mrst, ViewName,
+                        Callback, Acc0, Args3)
+            end
         end)
     catch throw:{build_view, WaitSeq} ->
         couch_views_jobs:build_view(Db, Mrst, WaitSeq),
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl.orig
similarity index 88%
copy from src/couch_views/src/couch_views.erl
copy to src/couch_views/src/couch_views.erl.orig
index 322415b..1830076 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl.orig
@@ -38,13 +38,11 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
     Args1 = to_mrargs(Args0),
     Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
     Args3 = couch_mrview_util:validate_args(Args2),
+
     ok = check_range(Args3),
-    case is_reduce_view(Args3) of
-        true -> throw({not_implemented});
-        false -> ok
-    end,
 
     try
+<<<<<<< HEAD
         fabric2_fdb:transactional(Db, fun(TxDb) ->
             ok = maybe_update_view(TxDb, Mrst, Args3),
             read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
@@ -52,6 +50,21 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
     catch throw:{build_view, WaitSeq} ->
         couch_views_jobs:build_view(Db, Mrst, WaitSeq),
         read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
+=======
+        case is_reduce_view(Args3) of
+            true ->
+                couch_views_reader:read_reduce(Db, Mrst, ViewName,
+                    Callback, Acc0, Args3);
+            false ->
+                couch_views_reader:read(Db, Mrst, ViewName,
+                    Callback, Acc0, Args3)
+        end
+    after
+        UpdateAfter = Args3#mrargs.update == lazy,
+        if UpdateAfter == false -> ok; true ->
+            couch_views_jobs:build_view_async(Db, Mrst)
+        end
+>>>>>>> Initial work
     end.
 
 
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 60ce300..07241dd 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,6 +20,7 @@
     get_kv_size/3,
 
     fold_map_idx/6,
+    fold_reduce_idx/6,
 
     write_doc/4
 ]).
@@ -42,6 +43,15 @@
 % View Build Sequence Access
 % (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
 
+% Id Range
+% {<db>, ?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}
+%   = [TotalKeys, TotalSize, UniqueKeys]
+
+% Map Range
+%{<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {{Key, DocId}, DupeId, Type}}
+%   = Value | UnEncodedKey
+% Type = ?VIEW_KEY | ?VIEW_ROW
+
 
 get_update_seq(TxDb, #mrst{sig = Sig}) ->
     #{
@@ -124,6 +134,8 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     Acc1.
 
 
+
+
 write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     #{
         id := DocId
@@ -134,6 +146,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     clear_id_idx(TxDb, Sig, DocId),
     lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
         clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+        %clear_reduce_idx
         update_row_count(TxDb, Sig, ViewId, -TotalKeys),
         update_kv_size(TxDb, Sig, ViewId, -TotalSize)
     end, ExistingViewKeys);
@@ -141,14 +154,17 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
 write_doc(TxDb, Sig, ViewIds, Doc) ->
     #{
         id := DocId,
-        results := Results
+        results := Results,
+        reduce_results := ReduceResults
     } = Doc,
 
     ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
 
     clear_id_idx(TxDb, Sig, DocId),
 
-    lists:foreach(fun({ViewId, NewRows}) ->
+    %% TODO: handle when there is no reduce
+    io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
+    lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -165,8 +181,11 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 update_kv_size(TxDb, Sig, ViewId, SizeChange),
                 []
         end,
-        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
-    end, lists:zip(ViewIds, Results)).
+        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
+        couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+            ExistingKeys, ReduceResult),
+        update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+    end, lists:zip3(ViewIds, Results, ReduceResults)).
 
 
 % For each row in a map view there are two rows stored in
@@ -338,6 +357,53 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
     end, KVsToAdd).
 
 
+update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+%%    Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+%%    KeysToRem = ExistingKeys -- Unique,
+%%    lists:foreach(fun(RemKey) ->
+%%        {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+%%        ok = erlfdb:clear_range(Tx, Start, End)
+%%    end, KeysToRem),
+%%
+    {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
+    ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
+    add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
+    add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
+
+
+add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
+    lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
+        KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+            ReduceType, ?VIEW_ROW_KEY),
+        VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
+            ReduceType, ?VIEW_ROW_VALUE),
+        ok = erlfdb:set(Tx, KK, Key2),
+        ok = erlfdb:add(Tx, VK, Val)
+    end, KVsToAdd).
+
+
+reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
+    Key = {ReduceKey, GroupLevel, ReduceType, RowType},
+    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
+%%    Encoded = couch_views_encoding:encode(MapKey, key),
+%%    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+%%    erlfdb_tuple:range(Key, DbPrefix).
+
+
 get_view_keys(TxDb, Sig, DocId) ->
     #{
         tx := Tx,
@@ -394,7 +460,6 @@ id_idx_range(DbPrefix, Sig, DocId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
     erlfdb_tuple:range(Key, DbPrefix).
 
-
 map_idx_prefix(DbPrefix, Sig, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
@@ -432,6 +497,47 @@ process_rows(Rows) ->
     end, [], Grouped).
 
 
+process_reduce_rows(Rows) ->
+    ReduceExact = encode_reduce_rows(Rows),
+    ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
+        Out = create_grouping(Key, Val, [], Groupings),
+        Out
+    end, #{}, Rows),
+    ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
+    {ReduceExact, ReduceGroups1}.
+
+
+encode_reduce_rows(Rows) ->
+    lists:map(fun({K, V}) ->
+        EK1 = couch_views_encoding:encode(K, key),
+        EK2 = couch_views_encoding:encode(K, value),
+        {EK1, EK2, V, group_level(K)}
+    end, Rows).
+
+
+group_level(Key) when is_list(Key) ->
+    length(Key);
+
+group_level(_Key) ->
+    1.
+
+
+create_grouping([], _Val, _, Groupings) ->
+    Groupings;
+
+create_grouping([Head | Rest], Val, Key, Groupings) ->
+    Key1 = Key ++ [Head],
+    Groupings1 = maps:update_with(Key1, fun(OldVal) ->
+        OldVal + Val
+        end, Val, Groupings),
+    create_grouping(Rest, Val, Key1, Groupings1);
+
+create_grouping(Key, Val, _, Groupings) ->
+    maps:update_with(Key, fun(OldVal) ->
+        OldVal + Val
+    end, Val, Groupings).
+
+
 calculate_row_size(Rows) ->
     lists:foldl(fun({K, V}, Acc) ->
         Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 75e4b36..d440839 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -178,6 +178,7 @@ update(#{} = Db, Mrst0, State0) ->
 
         DocAcc1 = fetch_docs(TxDb, DocAcc),
         {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
+        Results = run_reduce(Mrst1, MappedDocs),
         write_docs(TxDb, Mrst1, MappedDocs, State2),
 
         case Count < Limit of
@@ -286,6 +287,10 @@ map_docs(Mrst, Docs) ->
     {Mrst1, MappedDocs}.
 
 
+run_reduce(Mrst, MappedResults) ->
+    couch_views_reduce:run_reduce(Mrst, MappedResults).
+
+
 write_docs(TxDb, Mrst, Docs, State) ->
     #mrst{
         views = Views,
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 27671fb..d08515c 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,6 +13,7 @@
 -module(couch_views_reader).
 
 -export([
+    read_reduce/6,
     read/6
 ]).
 
@@ -23,6 +24,128 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
+read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+    #mrst{
+        language = Lang,
+        sig = Sig,
+        views = Views
+    } = Mrst,
+
+    ViewId = get_view_id(Lang, Args, ViewName, Views),
+    couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+        UserAcc0, Args).
+%%    Fun = fun handle_reduce_row/3,
+%%
+%%    try
+%%        fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%            Meta = get_meta(TxDb, Mrst, ViewId, Args),
+%%            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+%%
+%%            #mrargs{
+%%                limit = Limit
+%%            } = Args,
+%%
+%%            Acc0 = #{
+%%                db => TxDb,
+%%                skip => Args#mrargs.skip,
+%%                mrargs => undefined,
+%%                callback => UserCallback,
+%%                acc => UserAcc1,
+%%                row_count => 0,
+%%                limit => Limit
+%%            },
+%%
+%%            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+%%                Opts = reduce_mrargs_to_fdb_options(KeyArgs),
+%%                KeyAcc1 = KeyAcc0#{
+%%                    mrargs := KeyArgs
+%%                },
+%%                couch_views_fdb:fold_reduce_idx(
+%%                    TxDb,
+%%                    Sig,
+%%                    ViewId,
+%%                    Opts,
+%%                    Fun,
+%%                    KeyAcc1
+%%                )
+%%            end, Acc0, expand_keys_args(Args)),
+%%
+%%            #{
+%%                acc := UserAcc2
+%%            } = Acc1,
+%%            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+%%        end)
+%%    catch throw:{done, Out} ->
+%%        {ok, Out}
+%%    end.
+
+
+reduce_mrargs_to_fdb_options(Args) ->
+    #mrargs{
+%%        start_key = StartKey0,
+%%        start_key_docid = StartKeyDocId,
+%%        end_key = EndKey0,
+%%        end_key_docid = EndKeyDocId,
+        direction = Direction,
+        limit = Limit,
+        skip = Skip,
+        group_level = GroupLevel,
+        group = Group
+%%        inclusive_end = InclusiveEnd
+    } = Args,
+
+    GroupExact = Group == true andalso GroupLevel == 0,
+
+    GroupLevelEnd = case GroupExact of
+        true -> [];
+        false -> [{end_key, {<<255>>, GroupLevel + 1}}]
+    end,
+
+%%    StartKey1 = if StartKey0 == undefined -> undefined; true ->
+%%        couch_views_encoding:encode(StartKey0, key)
+%%    end,
+%%
+%%    StartKeyOpts = case {StartKey1, StartKeyDocId} of
+%%        {undefined, _} ->
+%%            [];
+%%        {StartKey1, StartKeyDocId} ->
+%%            [{start_key, {StartKey1, StartKeyDocId}}]
+%%    end,
+%%
+%%    EndKey1 = if EndKey0 == undefined -> undefined; true ->
+%%        couch_views_encoding:encode(EndKey0, key)
+%%    end,
+%%
+%%    EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+%%        {undefined, _, _} ->
+%%            [];
+%%        {EndKey1, <<>>, rev} when not InclusiveEnd ->
+%%            % When we iterate in reverse with
+%%            % inclusive_end=false we have to set the
+%%            % EndKeyDocId to <<255>> so that we don't
+%%            % include matching rows.
+%%            [{end_key_gt, {EndKey1, <<255>>}}];
+%%        {EndKey1, <<255>>, _} when not InclusiveEnd ->
+%%            % When inclusive_end=false we need to
+%%            % elide the default end_key_docid so as
+%%            % to not sort past the docids with the
+%%            % given end key.
+%%            [{end_key_gt, {EndKey1}}];
+%%        {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+%%            [{end_key_gt, {EndKey1, EndKeyDocId}}];
+%%        {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+%%            [{end_key, {EndKey1, EndKeyDocId}}]
+%%    end,
+
+    [
+        {dir, Direction},
+%%        {limit, Limit * 2 + Skip * 2},
+        {streaming_mode, large}
+%%        {streaming_mode, want_all}
+    ] ++ GroupLevelEnd.
+%%    ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
 read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     #mrst{
         language = Lang,
@@ -113,11 +236,41 @@ handle_row(DocId, Key, Value, Acc) ->
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
     Acc#{acc := UserAcc1}.
 
+handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+    Acc#{skip := Skip - 1};
+
+handle_reduce_row(Key, Value, Acc) ->
+    io:format("ACC ~p ~n", [Acc]),
+    #{
+        callback := UserCallback,
+        acc := UserAcc0,
+        row_count := RowCount,
+        limit := Limit
+    } = Acc,
+
+    Row = [
+        {key, Key},
+        {value, Value}
+    ],
+
+    RowCountNext = RowCount + 1,
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
+
+    case RowCountNext == Limit of
+        true ->
+            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+            maybe_stop({stop, UserAcc2});
+        false ->
+            Acc1
+    end.
+
 
 get_view_id(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
         {map, View, _Args} -> View#mrview.id_num;
-        {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+        {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
     end.
 
 
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
new file mode 100644
index 0000000..1502f38
--- /dev/null
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -0,0 +1,364 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reduce).
+
+
+-export([
+    run_reduce/2,
+    update_reduce_idx/6,
+    read_reduce/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+-define(LEVEL_FAN_POW, 4).
+-define(MAX_SKIP_LIST_LEVELS, 6).
+
+
+log_levels(Db, Sig, ViewId) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Opts = [{streaming_mode, want_all}],
+
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        lists:foreach(fun (Level) ->
+            {StartKey, EndKey} = erlfdb_tuple:range({Level},
+                ReduceIdxPrefix),
+
+            Acc0 = #{
+                sig => Sig,
+                view_id => ViewId,
+                reduce_idx_prefix => ReduceIdxPrefix,
+                next => key,
+                key => undefined,
+                rows => []
+            },
+
+            Fun = fun fold_fwd_cb/2,
+            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            #{
+                rows := Rows
+            } = Acc,
+            io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+            {ok, Rows}
+        end, Levels),
+        {ok, []}
+    end).
+
+
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        log_levels(TxDb, Sig, ViewId),
+%%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+
+
+        Acc0 = #{
+            sig => Sig,
+            view_id => ViewId,
+            user_acc => UserAcc0,
+            args => Args,
+            callback => UserCallback,
+            reduce_idx_prefix => ReduceIdxPrefix,
+            next => key,
+            rows => []
+        },
+
+
+%%        Opts = [{limit, 2}, {streaming_mode, want_all}],
+%%        EK = couch_views_encoding:encode(0, key),
+%%        {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
+%%            ReduceIdxPrefix),
+%%
+%%        Fun = fun fold_fwd_cb/2,
+%%        Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+        #{
+            rows := Rows
+        } = Acc0,
+        {ok, Rows}
+    end).
+
+args_to_fdb_opts(#mrargs{} = Args) ->
+    #mrargs{
+        limit = Limit,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+    ok.
+
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix
+    } = Acc,
+
+    {Level, EK, ?VIEW_ROW_KEY}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%%    Key = couch_views_encoding:decode(EV),
+    Val = couch_views_encoding:decode(EV),
+    Acc#{next := value, key := Val};
+
+fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix,
+        rows := Rows,
+        key := Key
+    } = Acc,
+
+    {Level, EK, ?VIEW_ROW_VALUE}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+
+%%    Key = couch_views_encoding:decode(EV),
+    Val = couch_views_encoding:decode(EV),
+    Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+
+
+run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
+    ReduceFuns = lists:map(fun(View) ->
+        #mrview{
+            id_num = Id,
+            reduce_funs = ViewReduceFuns
+        } = View,
+
+        [{_, Fun}] = ViewReduceFuns,
+        Fun
+    end, Views),
+
+    lists:map(fun (MappedResult) ->
+        #{
+            results := Results
+        } = MappedResult,
+
+        ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
+            reduce(ReduceFun, Result)
+        end, lists:zip(ReduceFuns, Results)),
+
+        MappedResult#{
+            reduce_results => ReduceResults
+        }
+    end, MappedResults).
+
+
+reduce(<<"_count">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Val} = Acc,
+                Acc#{Key := Val + 1};
+            false ->
+                Acc#{Key => 1}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        io:format("MAX ~p ~p ~n", [Key, Val]),
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Max} = Acc,
+                case Max >= Val of
+                    true ->
+                        Acc;
+                    false ->
+                        Acc#{Key := Val}
+                end;
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults).
+
+
+is_builtin(<<"_", _/binary>>) ->
+    true;
+
+is_builtin(_) ->
+    false.
+
+
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ViewOpts = #{
+        db_prefix => DbPrefix,
+        sig => Sig,
+        view_id => ViewId
+    },
+    create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
+
+    lists:foreach(fun ({Key, Val}) ->
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+    end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+        lists:foreach(fun(Level) ->
+            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+        end, Levels)
+    end).
+
+%% This sucks but its simple for now
+should_add_key_to_level(0, _, _) ->
+    true;
+
+should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
+    false;
+
+should_add_key_to_level(_, _, false) ->
+    false;
+
+should_add_key_to_level(_, _Key, _Prev) ->
+    crypto:rand_uniform(0, 2) == 0.
+
+%%should_add_key_to_level(Level, Key) ->
+%%    erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foldl(fun(Level, PrevCoinFlip) ->
+            io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
+            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+            io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
+            case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+                true ->
+                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
+                    true;
+                false ->
+                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
+                    false
+            end
+        end, true, Levels)
+    end).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+    case PrevVal >= Val of
+        true -> {PrevKey, PrevVal};
+        false -> {PrevKey, Val}
+    end.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
+    Key = {SkipLevel, ReduceKey, RowType},
+    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    EK = couch_views_encoding:encode(Key, key),
+    EVK = couch_views_encoding:encode(Key),
+    EV = couch_views_encoding:encode(Val),
+
+    KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
+    VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
+    ok = erlfdb:set(Tx, KK, EVK),
+    ok = erlfdb:set(Tx, VK, EV).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    % TODO: see if we need to add in conflict ranges for this for level=0
+    Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
+%%    LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
+
+    EK = couch_views_encoding:encode(Key, key),
+    EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+
+    {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
+%%    EndKey1 = erlfdb_key:first_greater_than(EndKey0),
+
+    Callback = fun row_cb/2,
+    Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
+    io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
+    Out.
+
+
+row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
+    io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+    {_Level, EK, _VIEW_ROW_VALUE}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    Val = couch_views_encoding:decode(EV),
+%%    io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
+
+    {key, {EK, ReduceIdxPrefix, Val}};
+
+row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
+    io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
+    {_Level, EK, ?VIEW_ROW_KEY}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    Key = couch_views_encoding:decode(EVK),
+
+    {Key, Val}.
+
+
+
+
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/src/couch_views_reduce_fdb.erl
similarity index 55%
copy from src/couch_views/include/couch_views.hrl
copy to src/couch_views/src/couch_views_reduce_fdb.erl
index 2e443eb..bcaaa30 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -10,17 +10,24 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-% indexing
--define(VIEW_UPDATE_SEQ, 0).
--define(VIEW_ID_INFO, 1).
--define(VIEW_ID_RANGE, 2).
--define(VIEW_MAP_RANGE, 3).
 
--define(VIEW_ROW_COUNT, 0).
--define(VIEW_KV_SIZE, 1).
+-module(couch_views_reduce_fdb).
+
+
+-export([
+%%    write_doc/4
+]).
+
+% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
+
+%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+%%    #{
+%%        id := DocId,
+%%        reduce_results := ReduceResults
+%%    } = Doc,
+%%    lists:foreach(fun({ViewId, NewRows}) ->
+%%        % update reduce index
+%%        ok
+%%    end, lists:zip(ViewIds, ReduceResults)).
 
--define(VIEW_ROW_KEY, 0).
--define(VIEW_ROW_VALUE, 1).
 
-% jobs api
--define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
new file mode 100644
index 0000000..3f7a173
--- /dev/null
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -0,0 +1,286 @@
+defmodule CouchViewsReduceTest do
+  use Couch.Test.ExUnit.Case
+
+  alias Couch.Test.Utils
+
+  alias Couch.Test.Setup
+
+  alias Couch.Test.Setup.Step
+
+  setup_all do
+    test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+
+    on_exit(fn ->
+      :test_util.stop_couch(test_ctx)
+    end)
+  end
+
+  setup do
+    db_name = Utils.random_name("db")
+
+    admin_ctx =
+      {:user_ctx,
+       Utils.erlang_record(:user_ctx, "couch/include/couch_db.hrl", roles: ["_admin"])}
+
+    {:ok, db} = :fabric2_db.create(db_name, [admin_ctx])
+
+    docs = create_docs()
+    ddoc = create_ddoc()
+
+    {ok, _} = :fabric2_db.update_docs(db, [ddoc | docs])
+
+    on_exit(fn ->
+      :fabric2_db.delete(db_name, [admin_ctx])
+    end)
+
+    %{
+      :db_name => db_name,
+      :db => db,
+      :ddoc => ddoc
+    }
+  end
+
+  #  test "group=true count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group => true
+  #      #            :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 1]},
+  #             {:row, [key: [1, 1, 5], value: 1]},
+  #             {:row, [key: [1, 2, 6], value: 1]},
+  #             {:row, [key: [2, 1], value: 1]},
+  #             {:row, [key: [2, 3, 6], value: 1]},
+  #             {:row, [key: [3, 1], value: 1]},
+  #             {:row, [key: [3, 1, 5], value: 1]},
+  #             {:row, [key: [3, 4, 5], value: 1]}
+  #           ]
+  #  end
+
+  #  test "group=1 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 1
+  #      #          :limit => 6
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1], value: 2]},
+  #             {:row, [key: [2], value: 2]},
+  #             {:row, [key: [3], value: 2]}
+  #           ]
+  #  end
+  #
+  #  test "group=2 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 2,
+  #      :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 2]},
+  #             {:row, [key: [1, 2], value: 1]},
+  #             {:row, [key: [2, 1], value: 1]},
+  #             {:row, [key: [2, 3], value: 1]},
+  #             {:row, [key: [3, 1], value: 2]},
+  #             {:row, [key: [3, 4], value: 1]}
+  #           ]
+  #  end
+  #
+  #  test "group=2 count reduce with limit = 3", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 2,
+  #      :limit => 4
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "baz")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: 1, value: 2]},
+  #             {:row, [key: 2, value: 2]},
+  #             {:row, [key: 3, value: 2]},
+  #             {:row, [key: [1, 1], value: 1]}
+  #           ]
+  #  end
+  #
+  #  # [
+  #  #  row: [key: [2019, 1, 2], value: 1],
+  #  #  row: [key: [2019, 1, 4], value: 1],
+  #  #  row: [key: [2019, 2, 1], value: 1],
+  #  #  row: [key: [2019, 2, 3], value: 1]
+  #  # ]
+  #
+  #  test "group=2 count reduce with startkey", context do
+  #    args = %{
+  #      #          :reduce => true,
+  #      #          :group_level => 2,
+  #      :start_key => [2019, 1, 4]
+  #      #          :limit => 4
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "boom")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2019, 1], value: 1]},
+  #             {:row, [key: [2019, 2], value: 2]}
+  #           ]
+  #  end
+
+  test "group_level=0 _max reduce", context do
+    args = %{
+      :reduce => true,
+      :group_level => 0
+      #            :limit => 9
+    }
+
+    {:ok, res} = run_query(context, args, "max")
+    IO.inspect(res, label: "OUT")
+
+    assert res == [
+             {:row, [key: :null, value: 3]}
+           ]
+  end
+
+  defp run_query(context, args, view) do
+    db = context[:db]
+    ddoc = context[:ddoc]
+
+    :couch_views.query(db, ddoc, view, &__MODULE__.default_cb/2, [], args)
+  end
+
+  def default_cb(:complete, acc) do
+    {:ok, Enum.reverse(acc)}
+  end
+
+  def default_cb({:final, info}, []) do
+    {:ok, [info]}
+  end
+
+  def default_cb({:final, _}, acc) do
+    {:ok, acc}
+  end
+
+  def default_cb({:meta, _}, acc) do
+    {:ok, acc}
+  end
+
+  def default_cb(:ok, :ddoc_updated) do
+    {:ok, :ddoc_updated}
+  end
+
+  def default_cb(row, acc) do
+    {:ok, [row | acc]}
+  end
+
+  defp create_docs() do
+    for i <- 1..1 do
+      group =
+        if rem(i, 3) == 0 do
+          "first"
+        else
+          "second"
+        end
+
+      :couch_doc.from_json_obj(
+        {[
+           {"_id", "doc-id-#{i}"},
+           {"value", i},
+           {"some", "field"},
+           {"group", group}
+         ]}
+      )
+    end
+  end
+
+  defp create_ddoc() do
+    :couch_doc.from_json_obj(
+      {[
+         {"_id", "_design/bar"},
+         {"views",
+          {[
+#             {"baz",
+#              {[
+#                 {"map",
+#                  """
+#                  function(doc) {
+#                    emit(doc.value, doc.value);
+#                    emit(doc.value, doc.value);
+#                    emit([doc.value, 1], doc.value);
+#                    emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+#                    if (doc.value === 3) {
+#                      emit([1, 1, 5], 1);
+#                      emit([doc.value, 1, 5], 1);
+#                    }
+#                   }
+#                  """},
+#                 {"reduce", "_count"}
+#               ]}},
+#             {"boom",
+#              {[
+#                 {"map",
+#                  """
+#                  function(doc) {
+#                      var month = 1;
+#                      if (doc.value % 2) {
+#                          month = 2;
+#                      }
+#                      emit([2019, month, doc.value], doc.value);
+#                  }
+#                  """},
+#                 {"reduce", "_count"}
+#               ]}},
+             {"max",
+              {[
+                 {"map",
+                  """
+                  function(doc) {
+                      //emit(doc.value, doc.value);
+                      //emit([doc.value, 1], doc.value);
+                      //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+                        emit(1, 1);
+                        emit(2, 2);
+                        emit(3, 3);
+                        emit(4, 4);
+
+                       emit([2019, 2, 2], 1);
+                       emit([2019, 3, 3], 2);
+                       emit([2019, 3, 3], 3);
+                       emit([2019, 4, 3], 4);
+                       emit([2019, 5, 3], 6);
+                      if (doc.value === 3) {
+                       //emit([doc.value, 1, 5], 1);
+                      }
+                  }
+                  """},
+                 {"reduce", "_stats"}
+               ]}}
+           ]}}
+       ]}
+    )
+  end
+end
diff --git a/src/couch_views/test/exunit/test_helper.exs b/src/couch_views/test/exunit/test_helper.exs
new file mode 100644
index 0000000..3140500
--- /dev/null
+++ b/src/couch_views/test/exunit/test_helper.exs
@@ -0,0 +1,2 @@
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()


[couchdb] 06/08: startkey/endkey with level0

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 457cb7e47bf14839e0d06731d54bee5055db7764
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Tue Dec 3 14:48:14 2019 +0200

    startkey/endkey with level0
---
 src/couch_views/src/couch_views.erl                |  18 ++--
 src/couch_views/src/couch_views_reduce.erl         |  45 +++++----
 src/couch_views/src/couch_views_reduce_fdb.erl     |  51 ++--------
 .../test/exunit/couch_views_reduce_test.exs        | 112 +++++++++++++++++----
 4 files changed, 131 insertions(+), 95 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 5b2c76f..5e40670 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -44,14 +44,7 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
     try
         fabric2_fdb:transactional(Db, fun(TxDb) ->
             ok = maybe_update_view(TxDb, Mrst, Args3),
-            case is_reduce_view(Args3) of
-                true ->
-                    couch_views_reader:read_reduce(Db, Mrst, ViewName,
-                        Callback, Acc0, Args3);
-                false ->
-                    couch_views_reader:read(Db, Mrst, ViewName,
-                        Callback, Acc0, Args3)
-            end
+            read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
         end)
     catch throw:{build_view, WaitSeq} ->
         couch_views_jobs:build_view(Db, Mrst, WaitSeq),
@@ -62,7 +55,14 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
 read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
         try
-            couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
+            case is_reduce_view(Args) of
+                true ->
+                    couch_views_reader:read_reduce(Db, Mrst, ViewName,
+                        Callback, Acc0, Args);
+                false ->
+                    couch_views_reader:read(Db, Mrst, ViewName,
+                        Callback, Acc0, Args)
+            end
         after
             UpdateAfter = Args#mrargs.update == lazy,
             if UpdateAfter == false -> ok; true ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 04c5cb8..a2e3a93 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -66,7 +66,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
             },
 
             Fun = fun handle_row/3,
-            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
+            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
             #{
                 user_acc := UserAcc1
             } = Acc1,
@@ -79,30 +79,35 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
 
 args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
     #mrargs{
-%%        limit = Limit,
-%%        start_key = StartKey,
-%%        end_key = EndKey,
-        group = Group,
-        group_level = GroupLevel
+        start_key = StartKey,
+        end_key = EndKey
     } = Args,
 
-    {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
-        ReduceIdxPrefix),
-
-    StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
-
-%%    StartKey1 = case StartKey of
-%%        undefined -> erlfdb_key:first_greater_than(StartKey0);
-%%        StartKey -> create_key(StartKey, 0, Red)
-%%    end,
-
-    StartKey1 = erlfdb_key:first_greater_than(StartKey0),
+    StartKey1 = case StartKey of
+        undefined ->
+            StartKey0 = encode_key(0, 0, ReduceIdxPrefix),
+            erlfdb_key:first_greater_than(StartKey0);
+        StartKey ->
+            StartKey0 = encode_key(StartKey, 0, ReduceIdxPrefix),
+            erlfdb_key:first_greater_or_equal(StartKey0)
+    end,
 
-    [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+    EndKey1 = case EndKey of
+        undefined ->
+            {_, EndKey0} = erlfdb_tuple:range({0},
+                ReduceIdxPrefix),
+            EndKey0;
+        EndKey ->
+            io:format("ENDKEY ~n"),
+            EndKey0 = encode_key(EndKey, 0, ReduceIdxPrefix),
+            erlfdb_key:first_greater_than(EndKey0)
+    end,
+    [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
 
 
-encode_key(Key, Level) ->
-    {Level, couch_views_encoding:encode(Key, key)}.
+encode_key(Key, Level, ReduceIdxPrefix) ->
+    EK = {Level, couch_views_encoding:encode(Key, key)},
+    erlfdb_tuple:pack(EK, ReduceIdxPrefix).
 
 
 handle_row(Key, Value, Acc) ->
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 7a7e120..77514ed 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -69,38 +69,6 @@ sum_rows(Rows) ->
     end, 0, Rows).
 
 
-%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
-%%    #{
-%%        db_prefix := DbPrefix
-%%    } = Db,
-%%
-%%%%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-%%    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-%%    #mrargs{
-%%        limit = Limit
-%%    } = Args,
-%%
-%%    fabric2_fdb:transactional(Db, fun(TxDb) ->
-%%
-%%        Acc0 = #{
-%%            sig => Sig,
-%%            view_id => ViewId,
-%%            user_acc => UserAcc0,
-%%            args => Args,
-%%            callback => UserCallback,
-%%            reduce_idx_prefix => ReduceIdxPrefix,
-%%            limit => Limit,
-%%            row_count => 0
-%%
-%%        },
-%%
-%%        Acc1 = read_level0_only(TxDb, Acc0, Callback),
-%%        #{
-%%            user_acc := UserAcc1
-%%        } = Acc1,
-%%        {ok, UserAcc1}
-%%    end).
-
 fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
     #{
         db_prefix := DbPrefix
@@ -158,17 +126,20 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
         LastKey0
     end,
 
+    GroupLevelKey = group_level_key(Key, GroupLevel),
+    GroupKV = [{GroupLevelKey, Val}],
+
     case group_level_equal(Key, LastKey, GroupLevel) of
         true ->
             Acc#{
-                rows := Rows ++ [{Key, Val}]
+                rows := Rows ++ GroupKV
             };
         false ->
             UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
                 Callback, UserAcc),
             Acc#{
                 user_acc := UserAcc1,
-                rows := [{Key, Val}]
+                rows := GroupKV
             }
     end.
 
@@ -352,6 +323,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
 
 
 hash_key(Key) ->
+    % TODO: look at alternatives like murmur3 here
     erlang:phash2(Key).
 
 
@@ -360,7 +332,6 @@ should_add_key_to_level(0, _KeyHash) ->
 
 should_add_key_to_level(Level, KeyHash) ->
     (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
 
 
 create_key(ReduceIdxPrefix, SkipLevel, Key) ->
@@ -392,13 +363,3 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
     EV = create_value(Key, Val),
 
     ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
-%%    case PrevVal >= Val of
-%%        true -> {PrevKey, PrevVal};
-%%        false -> {PrevKey, Val}
-%%    end.
-
-
-
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 0812f1f..d6dcc60 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -2,9 +2,7 @@ defmodule CouchViewsReduceTest do
   use Couch.Test.ExUnit.Case
 
   alias Couch.Test.Utils
-
   alias Couch.Test.Setup
-
   alias Couch.Test.Setup.Step
 
   setup_all do
@@ -57,26 +55,26 @@ defmodule CouchViewsReduceTest do
   #           ]
   #  end
 
-#  test "group_level=1 count reduce", context do
-#    args = %{
-#      :reduce => true,
-#      :group_level => 1
-#    }
-#
-#    {:ok, res} = run_query(context, args, "dates_count")
-#    IO.inspect(res, label: "OUT")
-#
-#    assert res == [
-#             {:row, [key: [2017], value: 4]},
-#             {:row, [key: [2018], value: 3]},
-#             {:row, [key: [2019], value: 2]}
-#           ]
-#  end
-
-  test "group_level=1 reduce reduce", context do
+  #  test "group_level=1 count reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 1
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "dates_count")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2017], value: 4]},
+  #             {:row, [key: [2018], value: 3]},
+  #             {:row, [key: [2019], value: 2]}
+  #           ]
+  #  end
+
+  test "group_level=1 reduce", context do
     args = %{
-      :reduce => true,
-      :group_level => 1
+      reduce: true,
+      group_level: 1
     }
 
     {:ok, res} = run_query(context, args, "dates_sum")
@@ -89,6 +87,78 @@ defmodule CouchViewsReduceTest do
            ]
   end
 
+  test "group_level=1 reduce with startkey/endkey", context do
+      args = %{
+          reduce: true,
+          group_level: 1,
+          start_key: [2017, 4, 1],
+          end_key: [2018, 3, 1],
+
+      }
+
+      {:ok, res} = run_query(context, args, "dates_sum")
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: [2017], value: 22]},
+                 {:row, [key: [2018], value: 6]}
+             ]
+  end
+
+  test "group_level=1 reduce with startkey/endkey take 2", context do
+      args = %{
+          reduce: true,
+          group_level: 1,
+          start_key: [2017, 4, 1],
+          end_key: [2019, 3, 2],
+      }
+
+      {:ok, res} = run_query(context, args, "dates_sum")
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: [2017], value: 22]},
+                 {:row, [key: [2018], value: 20]},
+                 {:row, [key: [2019], value: 4]}
+             ]
+  end
+
+  test "group_level=1 reduce with startkey/endkey take 3", context do
+      args = %{
+          reduce: true,
+          group_level: 1,
+          start_key: [2017, 4, 1],
+          end_key: [2019, 05, 1],
+      }
+
+      {:ok, res} = run_query(context, args, "dates_sum")
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: [2017], value: 22]},
+                 {:row, [key: [2018], value: 20]},
+                 {:row, [key: [2019], value: 17]}
+             ]
+  end
+
+  test "group=true reduce with startkey/endkey", context do
+      args = %{
+          reduce: true,
+          group: true,
+          start_key: [2018, 5, 1],
+          end_key: [2019, 04, 1],
+      }
+
+      {:ok, res} = run_query(context, args, "dates_sum")
+      IO.inspect(res, label: "OUT")
+
+      assert res == [
+                 {:row, [key: [2018, 5, 1], value: 7]},
+                 {:row, [key: [2019, 3, 1], value: 4]},
+                 {:row, [key: [2019, 4, 1], value: 6]}
+             ]
+  end
+
   #  test "group=1 count reduce", context do
   #    args = %{
   #      :reduce => true,


[couchdb] 02/08: printing

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 88ed4eb5f70c70cfa68270f832589a67ea3fe4f8
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Oct 28 14:34:00 2019 +0200

    printing
---
 src/couch_views/src/couch_views_fdb.erl            |  92 +---------
 src/couch_views/src/couch_views_indexer.erl        |   4 +-
 src/couch_views/src/couch_views_reduce.erl         |  84 +++------
 .../test/exunit/couch_views_reduce_test.exs        | 202 ++++++++++-----------
 4 files changed, 133 insertions(+), 249 deletions(-)

diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 07241dd..479a707 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -20,7 +20,6 @@
     get_kv_size/3,
 
     fold_map_idx/6,
-    fold_reduce_idx/6,
 
     write_doc/4
 ]).
@@ -183,8 +182,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
         couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
-            ExistingKeys, ReduceResult),
-        update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, ReduceResult)
+            ExistingKeys, ReduceResult)
     end, lists:zip3(ViewIds, Results, ReduceResults)).
 
 
@@ -357,53 +355,6 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
     end, KVsToAdd).
 
 
-update_reduce_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
-    #{
-        tx := Tx,
-        db_prefix := DbPrefix
-    } = TxDb,
-
-%%    Unique = lists:usort([K || {K, _V} <- NewRows]),
-
-%%    KeysToRem = ExistingKeys -- Unique,
-%%    lists:foreach(fun(RemKey) ->
-%%        {Start, End} = reduce_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
-%%        ok = erlfdb:clear_range(Tx, Start, End)
-%%    end, KeysToRem),
-%%
-    {ExactKVsToAdd, GroupKVsToAdd} = process_reduce_rows(NewRows),
-    ReduceIdxPrefix = reduce_idx_prefix(DbPrefix, Sig, ViewId),
-    add_reduce_kvs(Tx, ReduceIdxPrefix, ExactKVsToAdd, ?VIEW_REDUCE_EXACT),
-    add_reduce_kvs(Tx, ReduceIdxPrefix, GroupKVsToAdd, ?VIEW_REDUCE_GROUP).
-
-
-add_reduce_kvs(Tx, ReduceIdxPrefix, KVsToAdd, ReduceType) ->
-    lists:foreach(fun({Key1, Key2, Val, GroupLevel}) ->
-        KK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
-            ReduceType, ?VIEW_ROW_KEY),
-        VK = reduce_idx_key(ReduceIdxPrefix, Key1, GroupLevel,
-            ReduceType, ?VIEW_ROW_VALUE),
-        ok = erlfdb:set(Tx, KK, Key2),
-        ok = erlfdb:add(Tx, VK, Val)
-    end, KVsToAdd).
-
-
-reduce_idx_prefix(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
-    erlfdb_tuple:pack(Key, DbPrefix).
-
-
-reduce_idx_key(ReduceIdxPrefix, ReduceKey, GroupLevel, ReduceType, RowType) ->
-    Key = {ReduceKey, GroupLevel, ReduceType, RowType},
-    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
-
-
-%%reduce_idx_range(DbPrefix, Sig, ViewId, GroupKey, DocId) ->
-%%    Encoded = couch_views_encoding:encode(MapKey, key),
-%%    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
-%%    erlfdb_tuple:range(Key, DbPrefix).
-
-
 get_view_keys(TxDb, Sig, DocId) ->
     #{
         tx := Tx,
@@ -497,47 +448,6 @@ process_rows(Rows) ->
     end, [], Grouped).
 
 
-process_reduce_rows(Rows) ->
-    ReduceExact = encode_reduce_rows(Rows),
-    ReduceGroups = lists:foldl(fun({Key, Val}, Groupings) ->
-        Out = create_grouping(Key, Val, [], Groupings),
-        Out
-    end, #{}, Rows),
-    ReduceGroups1 = encode_reduce_rows(maps:to_list(ReduceGroups)),
-    {ReduceExact, ReduceGroups1}.
-
-
-encode_reduce_rows(Rows) ->
-    lists:map(fun({K, V}) ->
-        EK1 = couch_views_encoding:encode(K, key),
-        EK2 = couch_views_encoding:encode(K, value),
-        {EK1, EK2, V, group_level(K)}
-    end, Rows).
-
-
-group_level(Key) when is_list(Key) ->
-    length(Key);
-
-group_level(_Key) ->
-    1.
-
-
-create_grouping([], _Val, _, Groupings) ->
-    Groupings;
-
-create_grouping([Head | Rest], Val, Key, Groupings) ->
-    Key1 = Key ++ [Head],
-    Groupings1 = maps:update_with(Key1, fun(OldVal) ->
-        OldVal + Val
-        end, Val, Groupings),
-    create_grouping(Rest, Val, Key1, Groupings1);
-
-create_grouping(Key, Val, _, Groupings) ->
-    maps:update_with(Key, fun(OldVal) ->
-        OldVal + Val
-    end, Val, Groupings).
-
-
 calculate_row_size(Rows) ->
     lists:foldl(fun({K, V}, Acc) ->
         Acc + erlang:external_size(K) + erlang:external_size(V)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index d440839..4c430c1 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -178,8 +178,8 @@ update(#{} = Db, Mrst0, State0) ->
 
         DocAcc1 = fetch_docs(TxDb, DocAcc),
         {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
-        Results = run_reduce(Mrst1, MappedDocs),
-        write_docs(TxDb, Mrst1, MappedDocs, State2),
+        MappedReducedDocs = run_reduce(Mrst1, MappedDocs),
+        write_docs(TxDb, Mrst1, MappedReducedDocs, State2),
 
         case Count < Limit of
             true ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 1502f38..4cb7416 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -26,7 +26,7 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
--define(LEVEL_FAN_POW, 4).
+-define(LEVEL_FAN_POW, 1).
 -define(MAX_SKIP_LIST_LEVELS, 6).
 
 
@@ -84,7 +84,6 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
             args => Args,
             callback => UserCallback,
             reduce_idx_prefix => ReduceIdxPrefix,
-            next => key,
             rows => []
         },
 
@@ -111,31 +110,19 @@ args_to_fdb_opts(#mrargs{} = Args) ->
     ok.
 
 
-fold_fwd_cb({FullEncodedKey, EV}, #{next := key} = Acc) ->
-    #{
-        reduce_idx_prefix := ReduceIdxPrefix
-    } = Acc,
-
-    {Level, EK, ?VIEW_ROW_KEY}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-
-%%    Key = couch_views_encoding:decode(EV),
-    Val = couch_views_encoding:decode(EV),
-    Acc#{next := value, key := Val};
-
-fold_fwd_cb({FullEncodedKey, EV}, #{next := value} = Acc) ->
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
     #{
         reduce_idx_prefix := ReduceIdxPrefix,
-        rows := Rows,
-        key := Key
+        rows := Rows
     } = Acc,
 
-    {Level, EK, ?VIEW_ROW_VALUE}
+    {_Level, _EK}
         = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    {EK, EV1} = erlfdb_tuple:unpack(EV),
+    Key = couch_views_encoding:decode(EK),
+    Val = couch_views_encoding:decode(EV1),
 
-%%    Key = couch_views_encoding:decode(EV),
-    Val = couch_views_encoding:decode(EV),
-    Acc#{next := key, key := undefined, rows := Rows ++ [{Key, Val}]}.
+    Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
 
 
 run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -219,8 +206,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
     create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
 
     lists:foreach(fun ({Key, Val}) ->
-        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
-        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val])
+%%        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
     end, ReduceResult).
 
 
@@ -241,21 +228,9 @@ create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
         end, Levels)
     end).
 
-%% This sucks but its simple for now
-should_add_key_to_level(0, _, _) ->
-    true;
-
-should_add_key_to_level(?MAX_SKIP_LIST_LEVELS, _, _) ->
-    false;
-
-should_add_key_to_level(_, _, false) ->
-    false;
-
-should_add_key_to_level(_, _Key, _Prev) ->
-    crypto:rand_uniform(0, 2) == 0.
 
-%%should_add_key_to_level(Level, Key) ->
-%%    erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1) == 0.
+should_add_key_to_level(Level, Key) ->
+    (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
 %%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
 
 
@@ -270,20 +245,18 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
-        lists:foldl(fun(Level, PrevCoinFlip) ->
+        lists:foldl(fun(Level) ->
             io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
             {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
             io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
-            case should_add_key_to_level(Level, Key, PrevCoinFlip) of
+            case should_add_key_to_level(Level, Key) of
                 true ->
                     io:format("Adding ~p ~p ~n", [Level, Key]),
-                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val),
-                    true;
+                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
                 false ->
                     {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
                     io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal),
-                    false
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
             end
         end, true, Levels)
     end).
@@ -297,13 +270,20 @@ rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
 
 
 reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_SK_RANGE, ViewId},
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
-reduce_idx_key(ReduceIdxPrefix, SkipLevel, ReduceKey, RowType) ->
-    Key = {SkipLevel, ReduceKey, RowType},
-    erlfdb_tuple:pack(Key, ReduceIdxPrefix).
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+    EK = couch_views_encoding:encode(Key, key),
+    LevelKey = {SkipLevel, EK},
+    erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+    EK = couch_views_encoding:encode(Key),
+    EV = couch_views_encoding:encode(Val),
+    erlfdb_tuple:pack({EK, EV}).
 
 
 add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
@@ -311,14 +291,10 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
         tx := Tx
     } = TxDb,
 
-    EK = couch_views_encoding:encode(Key, key),
-    EVK = couch_views_encoding:encode(Key),
-    EV = couch_views_encoding:encode(Val),
+    LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+    EV = create_value(Key, Val),
 
-    KK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_KEY),
-    VK = reduce_idx_key(ReduceIdxPrefix, Level, EK, ?VIEW_ROW_VALUE),
-    ok = erlfdb:set(Tx, KK, EVK),
-    ok = erlfdb:set(Tx, VK, EV).
+    ok = erlfdb:set(Tx, LevelKey, EV).
 
 
 get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 3f7a173..a526658 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -8,7 +8,7 @@ defmodule CouchViewsReduceTest do
   alias Couch.Test.Setup.Step
 
   setup_all do
-    test_ctx = :test_util.start_couch([:fabric, :couch_views, :couch_jobs])
+    test_ctx = :test_util.start_couch([:fabric, :couch_js, :couch_views, :couch_jobs])
 
     on_exit(fn ->
       :test_util.stop_couch(test_ctx)
@@ -40,30 +40,30 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-  #  test "group=true count reduce", context do
-  #    args = %{
-  #      :reduce => true,
-  #      :group => true
-  #      #            :limit => 9
-  #    }
-  #
-  #    {:ok, res} = run_query(context, args, "baz")
-  #    IO.inspect(res, label: "OUT")
-  #
-  #    assert res == [
-  #             {:row, [key: 1, value: 2]},
-  #             {:row, [key: 2, value: 2]},
-  #             {:row, [key: 3, value: 2]},
-  #             {:row, [key: [1, 1], value: 1]},
-  #             {:row, [key: [1, 1, 5], value: 1]},
-  #             {:row, [key: [1, 2, 6], value: 1]},
-  #             {:row, [key: [2, 1], value: 1]},
-  #             {:row, [key: [2, 3, 6], value: 1]},
-  #             {:row, [key: [3, 1], value: 1]},
-  #             {:row, [key: [3, 1, 5], value: 1]},
-  #             {:row, [key: [3, 4, 5], value: 1]}
-  #           ]
-  #  end
+  test "group=true count reduce", context do
+    args = %{
+      :reduce => true,
+      :group => true
+      #            :limit => 9
+    }
+
+    {:ok, res} = run_query(context, args, "baz")
+    IO.inspect(res, label: "OUT")
+
+    assert res == [
+             {:row, [key: 1, value: 2]},
+             {:row, [key: 2, value: 2]},
+             {:row, [key: 3, value: 2]},
+             {:row, [key: [1, 1], value: 1]},
+             {:row, [key: [1, 1, 5], value: 1]},
+             {:row, [key: [1, 2, 6], value: 1]},
+             {:row, [key: [2, 1], value: 1]},
+             {:row, [key: [2, 3, 6], value: 1]},
+             {:row, [key: [3, 1], value: 1]},
+             {:row, [key: [3, 1, 5], value: 1]},
+             {:row, [key: [3, 4, 5], value: 1]}
+           ]
+  end
 
   #  test "group=1 count reduce", context do
   #    args = %{
@@ -150,20 +150,20 @@ defmodule CouchViewsReduceTest do
   #           ]
   #  end
 
-  test "group_level=0 _max reduce", context do
-    args = %{
-      :reduce => true,
-      :group_level => 0
-      #            :limit => 9
-    }
-
-    {:ok, res} = run_query(context, args, "max")
-    IO.inspect(res, label: "OUT")
-
-    assert res == [
-             {:row, [key: :null, value: 3]}
-           ]
-  end
+  #  test "group_level=0 _sum reduce", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group_level => 0
+  #      #            :limit => 9
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "max")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: :null, value: 3]}
+  #           ]
+  #  end
 
   defp run_query(context, args, view) do
     db = context[:db]
@@ -217,70 +217,68 @@ defmodule CouchViewsReduceTest do
   end
 
   defp create_ddoc() do
-    :couch_doc.from_json_obj(
-      {[
-         {"_id", "_design/bar"},
-         {"views",
-          {[
-#             {"baz",
-#              {[
-#                 {"map",
-#                  """
-#                  function(doc) {
-#                    emit(doc.value, doc.value);
-#                    emit(doc.value, doc.value);
-#                    emit([doc.value, 1], doc.value);
-#                    emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-#                    if (doc.value === 3) {
-#                      emit([1, 1, 5], 1);
-#                      emit([doc.value, 1, 5], 1);
-#                    }
-#                   }
-#                  """},
-#                 {"reduce", "_count"}
-#               ]}},
-#             {"boom",
-#              {[
-#                 {"map",
-#                  """
-#                  function(doc) {
-#                      var month = 1;
-#                      if (doc.value % 2) {
-#                          month = 2;
-#                      }
-#                      emit([2019, month, doc.value], doc.value);
-#                  }
-#                  """},
-#                 {"reduce", "_count"}
-#               ]}},
-             {"max",
-              {[
-                 {"map",
-                  """
-                  function(doc) {
-                      //emit(doc.value, doc.value);
-                      //emit([doc.value, 1], doc.value);
-                      //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-                        emit(1, 1);
-                        emit(2, 2);
-                        emit(3, 3);
-                        emit(4, 4);
+    :couch_doc.from_json_obj({[
+       {"_id", "_design/bar"},
+       {"views",
+        {[
+           {"baz",
+            {[
+               {"map",
+                """
+                function(doc) {
+                  emit(doc.value, doc.value);
+                  emit(doc.value, doc.value);
+                  emit([doc.value, 1], doc.value);
+                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
 
-                       emit([2019, 2, 2], 1);
-                       emit([2019, 3, 3], 2);
-                       emit([2019, 3, 3], 3);
-                       emit([2019, 4, 3], 4);
-                       emit([2019, 5, 3], 6);
-                      if (doc.value === 3) {
-                       //emit([doc.value, 1, 5], 1);
-                      }
+                  if (doc.value === 3) {
+                    emit([1, 1, 5], 1);
+                    emit([doc.value, 1, 5], 1);
                   }
-                  """},
-                 {"reduce", "_stats"}
-               ]}}
-           ]}}
-       ]}
-    )
+                 }
+                """},
+               {"reduce", "_count"}
+             ]}}
+           #             {"boom",
+           #              {[
+           #                 {"map",
+           #                  """
+           #                  function(doc) {
+           #                      var month = 1;
+           #                      if (doc.value % 2) {
+           #                          month = 2;
+           #                      }
+           #                      emit([2019, month, doc.value], doc.value);
+           #                  }
+           #                  """},
+           #                 {"reduce", "_count"}
+           #               ]}},
+           #             {"max",
+           #              {[
+           #                 {"map",
+           #                  """
+           #                  function(doc) {
+           #                      //emit(doc.value, doc.value);
+           #                      //emit([doc.value, 1], doc.value);
+           #                      //emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+           #                        emit(1, 1);
+           #                        emit(2, 2);
+           #                        emit(3, 3);
+           #                        emit(4, 4);
+           #
+           #                       emit([2019, 2, 2], 1);
+           #                       emit([2019, 3, 3], 2);
+           #                       emit([2019, 3, 3], 3);
+           #                       emit([2019, 4, 3], 4);
+           #                       emit([2019, 5, 3], 6);
+           #                      if (doc.value === 3) {
+           #                       //emit([doc.value, 1, 5], 1);
+           #                      }
+           #                  }
+           #                  """},
+           #                 {"reduce", "_stats"}
+           #               ]}}
+         ]}}
+     ]})
   end
 end


[couchdb] 04/08: can do group_level query

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0ddb80078bc5ba527b875f3e14d51295cacc46f6
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 14:09:36 2019 +0200

    can do group_level query
---
 src/couch_views/src/couch_views_reader.erl         |   9 +-
 src/couch_views/src/couch_views_reduce.erl         |  15 +-
 src/couch_views/src/couch_views_reduce_fdb.erl     | 175 +++++++++++++++------
 .../test/exunit/couch_views_reduce_test.exs        |  47 +++---
 4 files changed, 168 insertions(+), 78 deletions(-)

diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 394b3cf..e750a94 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -32,7 +32,8 @@ read_reduce(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
     } = Mrst,
 
     ViewId = get_view_id(Lang, Args, ViewName, Views),
-    couch_views_reduce:read_reduce(Db, Sig, ViewId, UserCallback,
+    Reducer = get_view_reducer(Lang, Args, ViewName, Views),
+    couch_views_reduce:read_reduce(Db, Sig, ViewId, Reducer, UserCallback,
         UserAcc0, Args).
 %%    Fun = fun handle_reduce_row/3,
 %%
@@ -243,6 +244,12 @@ get_view_id(Lang, Args, ViewName, Views) ->
         {red, {_Idx, _Lang, View}, _Args} -> View#mrview.id_num
     end.
 
+get_view_reducer(Lang, Args, ViewName, Views) ->
+    case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+        {map, View, _Args} -> throw(no_reduce);
+        View -> couch_mrview_util:extract_view_reduce(View)
+    end.
+
 
 expand_keys_args(#mrargs{keys = undefined} = Args) ->
     [Args];
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index ebd2f47..b7eb18e 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,7 +15,7 @@
 
 -export([
     run_reduce/2,
-    read_reduce/6,
+    read_reduce/7,
     setup_reduce_indexes/3
 ]).
 
@@ -30,7 +30,7 @@
 -define(MAX_SKIP_LIST_LEVELS, 6).
 
 
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
+read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
     #{
         db_prefix := DbPrefix
     } = Db,
@@ -38,9 +38,16 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
 %%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     #mrargs{
-        limit = Limit
+        limit = Limit,
+        group = Group,
+        group_level = GroupLevel
     } = Args,
 
+    GroupLevel1 = case Group of
+        true -> group_true;
+        _ -> GroupLevel
+    end,
+
     Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
 
     try
@@ -59,7 +66,7 @@ read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
             },
 
             Fun = fun handle_row/3,
-            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
+            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel, Opts, Fun, Acc0),
             #{
                 user_acc := UserAcc1
             } = Acc1,
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 9683265..5759c42 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,7 +15,7 @@
 
 
 -export([
-    fold_level0/6,
+    fold_level0/8,
     create_skip_list/3,
     update_reduce_idx/6
 ]).
@@ -26,7 +26,7 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/include/fabric2.hrl").
 
--define(MAX_SKIP_LIST_LEVELS, 6).
+-define(MAX_SKIP_LIST_LEVELS, 1).
 -define(LEVEL_FAN_POW, 1).
 
 log_levels(Db, Sig, ViewId) ->
@@ -34,34 +34,40 @@ log_levels(Db, Sig, ViewId) ->
         db_prefix := DbPrefix
     } = Db,
 
-    Levels = lists:seq(0, 6),
+    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     Opts = [{streaming_mode, want_all}],
 
     fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
-        lists:foreach(fun (Level) ->
+        lists:foldl(fun (Level, Level0Total) ->
             {StartKey, EndKey} = erlfdb_tuple:range({Level},
                 ReduceIdxPrefix),
 
-            Acc0 = #{
-                sig => Sig,
-                view_id => ViewId,
-                reduce_idx_prefix => ReduceIdxPrefix,
-                user_acc => [],
-                callback => fun handle_log_levels/3
-            },
-
-            Fun = fun fold_fwd_cb/2,
-            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
-            #{
-                user_acc := Rows
-            } = Acc,
-            io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
-        end, Levels)
+            Future = erlfdb:get_range(Tx, StartKey, EndKey, Opts),
+            Rows = lists:map(fun ({_Key, EV}) ->
+                unpack_key_value(EV)
+            end, erlfdb:wait(Future)),
+
+            io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows]),
+            case Level == 0 of
+                true ->
+                    sum_rows(Rows);
+                false ->
+                    Total = sum_rows(Rows),
+                    if Total == Level0Total -> Level0Total; true ->
+                        io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
+%%                        throw(level_total_error)
+                    end
+            end
+
+        end, 0, Levels)
     end).
 
-handle_log_levels(Key, Value, Acc) ->
-    Acc ++ [{Key, Value}].
+sum_rows(Rows) ->
+    lists:foldl(fun ({_, Val}, Sum) ->
+        Val + Sum
+    end, 0, Rows).
+
 
 %%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
 %%    #{
@@ -95,7 +101,7 @@ handle_log_levels(Key, Value, Acc) ->
 %%        {ok, UserAcc1}
 %%    end).
 
-fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
     #{
         db_prefix := DbPrefix
     } = Db,
@@ -108,7 +114,10 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
         user_acc => UserAcc0,
         %%            args := Args,
         callback => UserCallback,
-        reduce_idx_prefix => ReduceIdxPrefix
+        reduce_idx_prefix => ReduceIdxPrefix,
+        reducer => Reducer,
+        group_level => GroupLevel,
+        rows => []
     },
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -118,36 +127,94 @@ fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
         } = TxDb,
 
 
-%%        {StartKey, EndKey} = erlfdb_tuple:range({0},
-%%            ReduceIdxPrefix),
         {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
         {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
 
-%%        ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
         Fun = fun fold_fwd_cb/2,
         Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
         #{
-            user_acc := UserAcc1
+            user_acc := UserAcc1,
+            rows := Rows
         } = Acc1,
-        UserAcc1
+
+        rereduce_and_reply(Reducer, Rows, GroupLevel,
+            UserCallback, UserAcc1)
     end).
 
 
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
     #{
-        reduce_idx_prefix := ReduceIdxPrefix,
         callback := Callback,
-        user_acc := UserAcc
+        user_acc := UserAcc,
+        group_level := GroupLevel,
+        rows := Rows,
+        reducer := Reducer
     } = Acc,
 
-    {_Level, _EK}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    {EK, EV1} = erlfdb_tuple:unpack(EV),
-    Key = couch_views_encoding:decode(EK),
-    Val = couch_views_encoding:decode(EV1),
+    {Key, Val} = unpack_key_value(EV),
+
+    LastKey = if Rows == [] -> false; true ->
+        {LastKey0, _} = lists:last(Rows),
+        LastKey0
+    end,
+
+    case group_level_equal(Key, LastKey, GroupLevel) of
+        true ->
+            Acc#{
+                rows := Rows ++ [{Key, Val}]
+            };
+        false ->
+            UserAcc1 = rereduce_and_reply(Reducer, Rows, GroupLevel,
+                Callback, UserAcc),
+            Acc#{
+                user_acc := UserAcc1,
+                rows := [{Key, Val}]
+            }
+    end.
+
+rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
+    Acc;
+
+rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
+    {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+    Callback(ReducedKey, ReducedVal, Acc).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+    no_kvs;
 
-    UserAcc1 = Callback(Key, Val, UserAcc),
-    Acc#{user_acc := UserAcc1}.
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+    {Key, Val} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Val};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+    Val = length(Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+    true;
+
+group_level_equal(_One, _Two, group_true) ->
+    false;
+
+group_level_equal(One, Two, GroupLevel) ->
+    GroupOne = group_level_key(One, GroupLevel),
+    GroupTwo = group_level_key(Two, GroupLevel),
+    GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+    null;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+    lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+    Key.
 
 
 reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
@@ -155,6 +222,13 @@ reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
+unpack_key_value(EncodedValue) ->
+    {EK, EV1} = erlfdb_tuple:unpack(EncodedValue),
+    Key = couch_views_encoding:decode(EK),
+    Val = couch_views_encoding:decode(EV1),
+    {Key, Val}.
+
+
 %% Inserting
 update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
     #{
@@ -205,15 +279,15 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
         lists:foreach(fun(Level) ->
             {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
-            io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+            io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
             case should_add_key_to_level(Level, KeyHash) of
                 true ->
-                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    io:format("Adding at ~p ~p ~n", [Level, Key]),
                     add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
                 false ->
-                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+%%                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+%%                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
             end
         end, Levels)
     end).
@@ -229,7 +303,7 @@ get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
 
     EK = couch_views_encoding:encode(Key, key),
     StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-    StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+    StartKeySel = erlfdb_key:last_less_than(StartKey),
     EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
 
     Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
@@ -241,6 +315,9 @@ hash_key(Key) ->
     erlang:phash2(Key).
 
 
+should_add_key_to_level(0, _KeyHash) ->
+    true;
+
 should_add_key_to_level(Level, KeyHash) ->
     (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
 %%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
@@ -277,11 +354,11 @@ add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
     ok = erlfdb:set(Tx, LevelKey, EV).
 
 
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
-    case PrevVal >= Val of
-        true -> {PrevKey, PrevVal};
-        false -> {PrevKey, Val}
-    end.
+%%rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+%%    case PrevVal >= Val of
+%%        true -> {PrevKey, PrevVal};
+%%        false -> {PrevKey, Val}
+%%    end.
 
 
 
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index c1b35e2..488f3ee 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -60,16 +60,16 @@ defmodule CouchViewsReduceTest do
   test "group_level=1 count reduce", context do
       args = %{
           :reduce => true,
-          :group => true,
+          :group_level => 1,
       }
 
       {:ok, res} = run_query(context, args, "dates")
       IO.inspect(res, label: "OUT")
 
       assert res == [
-                 {:row, [key: [2017], value: 1]},
-                 {:row, [key: [2018], value: 1]},
-                 {:row, [key: [2019], value: 1]}
+                 {:row, [key: [2017], value: 4]},
+                 {:row, [key: [2018], value: 3]},
+                 {:row, [key: [2019], value: 2]}
              ]
   end
 
@@ -221,7 +221,7 @@ defmodule CouchViewsReduceTest do
       [2019, 4, 1]
     ]
 
-    for i <- 1..4 do
+    for i <- 1..10 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -235,7 +235,6 @@ defmodule CouchViewsReduceTest do
          {"some", "field"},
          {"group", group},
          {"date", Enum.at(dates, i - 1)}
-         #           {"timestamp", Enum.at(timestamps, i - 1)}
        ]})
     end
   end
@@ -254,25 +253,25 @@ defmodule CouchViewsReduceTest do
                  }
                 """},
                {"reduce", "_count"}
-             ]}},
-           {"baz",
-            {[
-               {"map",
-                """
-                function(doc) {
-                  emit(doc.value, doc.value);
-                  emit(doc.value, doc.value);
-                  emit([doc.value, 1], doc.value);
-                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-
-                  if (doc.value === 3) {
-                    emit([1, 1, 5], 1);
-                    emit([doc.value, 1, 5], 1);
-                  }
-                 }
-                """},
-               {"reduce", "_count"}
              ]}}
+#           {"baz",
+#            {[
+#               {"map",
+#                """
+#                function(doc) {
+#                  emit(doc.value, doc.value);
+#                  emit(doc.value, doc.value);
+#                  emit([doc.value, 1], doc.value);
+#                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+#
+#                  if (doc.value === 3) {
+#                    emit([1, 1, 5], 1);
+#                    emit([doc.value, 1, 5], 1);
+#                  }
+#                 }
+#                """},
+#               {"reduce", "_count"}
+#             ]}}
            #             {"boom",
            #              {[
            #                 {"map",


[couchdb] 03/08: progress with reading level 0

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0076f15d191bd2fb71a0837e474ce12019be0c8f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Oct 30 11:37:36 2019 +0200

    progress with reading level 0
---
 src/couch_views/src/couch_views_fdb.erl            |   4 +-
 src/couch_views/src/couch_views_indexer.erl        |   8 +-
 src/couch_views/src/couch_views_reader.erl         |  30 --
 src/couch_views/src/couch_views_reduce.erl         | 306 +++++++--------------
 src/couch_views/src/couch_views_reduce_fdb.erl     | 274 +++++++++++++++++-
 .../test/exunit/couch_views_reduce_test.exs        |  94 +++++--
 6 files changed, 428 insertions(+), 288 deletions(-)

diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 479a707..8999d76 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -133,8 +133,6 @@ fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
     Acc1.
 
 
-
-
 write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
     #{
         id := DocId
@@ -181,7 +179,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 []
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
-        couch_views_reduce:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
             ExistingKeys, ReduceResult)
     end, lists:zip3(ViewIds, Results, ReduceResults)).
 
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4c430c1..cff15b0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -298,11 +298,17 @@ write_docs(TxDb, Mrst, Docs, State) ->
     } = Mrst,
 
     #{
-        last_seq := LastSeq
+        last_seq := LastSeq,
+        view_seq := ViewSeq
     } = State,
 
     ViewIds = [View#mrview.id_num || View <- Views],
 
+    %%  First build of the view
+    if ViewSeq /= <<>> -> ok; true ->
+        couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+    end,
+
     lists:foreach(fun(Doc) ->
         couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
     end, Docs),
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index d08515c..394b3cf 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -236,36 +236,6 @@ handle_row(DocId, Key, Value, Acc) ->
     UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
     Acc#{acc := UserAcc1}.
 
-handle_reduce_row(_Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
-    Acc#{skip := Skip - 1};
-
-handle_reduce_row(Key, Value, Acc) ->
-    io:format("ACC ~p ~n", [Acc]),
-    #{
-        callback := UserCallback,
-        acc := UserAcc0,
-        row_count := RowCount,
-        limit := Limit
-    } = Acc,
-
-    Row = [
-        {key, Key},
-        {value, Value}
-    ],
-
-    RowCountNext = RowCount + 1,
-
-    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
-    Acc1 = Acc#{acc := UserAcc1, row_count := RowCountNext},
-
-    case RowCountNext == Limit of
-        true ->
-            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
-            maybe_stop({stop, UserAcc2});
-        false ->
-            Acc1
-    end.
-
 
 get_view_id(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index 4cb7416..ebd2f47 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,8 @@
 
 -export([
     run_reduce/2,
-    update_reduce_idx/6,
-    read_reduce/6
+    read_reduce/6,
+    setup_reduce_indexes/3
 ]).
 
 
@@ -30,99 +30,120 @@
 -define(MAX_SKIP_LIST_LEVELS, 6).
 
 
-log_levels(Db, Sig, ViewId) ->
+read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
     #{
         db_prefix := DbPrefix
     } = Db,
 
-    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-    Opts = [{streaming_mode, want_all}],
+    #mrargs{
+        limit = Limit
+    } = Args,
 
-    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
-        lists:foreach(fun (Level) ->
-            {StartKey, EndKey} = erlfdb_tuple:range({Level},
-                ReduceIdxPrefix),
+    Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+
+    try
+        fabric2_fdb:transactional(Db, fun(TxDb) ->
+    %%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
 
             Acc0 = #{
                 sig => Sig,
                 view_id => ViewId,
+                user_acc => UserAcc0,
+                args => Args,
+                callback => UserCallback,
                 reduce_idx_prefix => ReduceIdxPrefix,
-                next => key,
-                key => undefined,
-                rows => []
+                limit => Limit,
+                row_count => 0
             },
 
-            Fun = fun fold_fwd_cb/2,
-            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            Fun = fun handle_row/3,
+            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Opts, Fun, Acc0),
             #{
-                rows := Rows
-            } = Acc,
-            io:format("~n ~n LEVEL ~p rows ~p ~n", [Level, Rows]),
-            {ok, Rows}
-        end, Levels),
-        {ok, []}
-    end).
-
+                user_acc := UserAcc1
+            } = Acc1,
+            {ok, maybe_stop(UserCallback(complete, UserAcc1))}
+        end)
+    catch throw:{done, Out} ->
+        {ok, Out}
+    end.
 
-read_reduce(Db, Sig, ViewId, UserCallback, UserAcc0, Args) ->
-    #{
-        db_prefix := DbPrefix
-    } = Db,
 
-    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
-        log_levels(TxDb, Sig, ViewId),
-%%        Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-
-
-        Acc0 = #{
-            sig => Sig,
-            view_id => ViewId,
-            user_acc => UserAcc0,
-            args => Args,
-            callback => UserCallback,
-            reduce_idx_prefix => ReduceIdxPrefix,
-            rows => []
-        },
-
-
-%%        Opts = [{limit, 2}, {streaming_mode, want_all}],
-%%        EK = couch_views_encoding:encode(0, key),
-%%        {StartKey, EndKey} = erlfdb_tuple:range({?MAX_SKIP_LIST_LEVELS, EK},
-%%            ReduceIdxPrefix),
-%%
-%%        Fun = fun fold_fwd_cb/2,
-%%        Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
-        #{
-            rows := Rows
-        } = Acc0,
-        {ok, Rows}
-    end).
-
-args_to_fdb_opts(#mrargs{} = Args) ->
+args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
     #mrargs{
-        limit = Limit,
-        start_key = StartKey,
-        end_key = EndKey
+%%        limit = Limit,
+%%        start_key = StartKey,
+%%        end_key = EndKey,
+        group = Group,
+        group_level = GroupLevel
     } = Args,
-    ok.
 
+    {UStartKey0, EndKey0} = erlfdb_tuple:range({0},
+        ReduceIdxPrefix),
+
+    StartKey0 = erlfdb_tuple:pack({0, couch_views_encoding:encode(0, key)}, ReduceIdxPrefix),
+
+%%    StartKey1 = case StartKey of
+%%        undefined -> erlfdb_key:first_greater_than(StartKey0);
+%%        StartKey -> create_key(StartKey, 0, Red)
+%%    end,
+
+    StartKey1 = erlfdb_key:first_greater_than(StartKey0),
 
-fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+    [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey0}].
+
+
+encode_key(Key, Level) ->
+    {Level, couch_views_encoding:encode(Key, key)}.
+
+
+handle_row(Key, Value, Acc) ->
     #{
-        reduce_idx_prefix := ReduceIdxPrefix,
-        rows := Rows
+        callback := UserCallback,
+        user_acc := UserAcc0,
+        row_count := RowCount,
+        limit := Limit
     } = Acc,
 
-    {_Level, _EK}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    {EK, EV1} = erlfdb_tuple:unpack(EV),
-    Key = couch_views_encoding:decode(EK),
-    Val = couch_views_encoding:decode(EV1),
+    Row = [
+        {key, Key},
+        {value, Value}
+    ],
 
-    Acc#{key := Val, rows := Rows ++ [{Key, Val}]}.
+    RowCountNext = RowCount + 1,
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc1 = Acc#{user_acc := UserAcc1, row_count := RowCountNext},
+
+    case RowCountNext == Limit of
+        true ->
+            UserAcc2 = maybe_stop(UserCallback(complete, UserAcc1)),
+            maybe_stop({stop, UserAcc2});
+        false ->
+            Acc1
+    end.
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+setup_reduce_indexes(Db, Sig, ViewIds) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foreach(fun (ViewId) ->
+            ViewOpts = #{
+                db_prefix => DbPrefix,
+                sig => Sig,
+                view_id => ViewId
+            },
+            couch_views_reduce_fdb:create_skip_list(TxDb,
+                ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+        end, ViewIds)
+    end).
 
 
 run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
@@ -192,149 +213,6 @@ is_builtin(<<"_", _/binary>>) ->
 is_builtin(_) ->
     false.
 
-
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
-    #{
-        db_prefix := DbPrefix
-    } = TxDb,
-
-    ViewOpts = #{
-        db_prefix => DbPrefix,
-        sig => Sig,
-        view_id => ViewId
-    },
-    create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts),
-
-    lists:foreach(fun ({Key, Val}) ->
-        io:format("RESULTS KV ~p ~p ~n", [Key, Val])
-%%        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
-    end, ReduceResult).
-
-
-create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
-    #{
-        db_prefix := DbPrefix,
-        sig := Sig,
-        view_id := ViewId
-    } = ViewOpts,
-
-    Levels = lists:seq(0, MaxLevel),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-
-        lists:foreach(fun(Level) ->
-            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
-        end, Levels)
-    end).
-
-
-should_add_key_to_level(Level, Key) ->
-    (erlang:phash2(Key) band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
-%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
-
-
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
-    #{
-        db_prefix := DbPrefix,
-        sig := Sig,
-        view_id := ViewId
-    } = ViewOpts,
-
-    Levels = lists:seq(0, MaxLevel),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
-
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        lists:foldl(fun(Level) ->
-            io:format("PROCESS ~p ~p ~p ~n", [Level, Key, Val]),
-            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
-            io:format("PREV VALS ~p ~p ~n", [PrevKey, PrevVal]),
-            case should_add_key_to_level(Level, Key) of
-                true ->
-                    io:format("Adding ~p ~p ~n", [Level, Key]),
-                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
-                false ->
-                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
-            end
-        end, true, Levels)
-    end).
-
-
-rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
-    case PrevVal >= Val of
-        true -> {PrevKey, PrevVal};
-        false -> {PrevKey, Val}
-    end.
-
-
 reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
-
-
-create_key(ReduceIdxPrefix, SkipLevel, Key) ->
-    EK = couch_views_encoding:encode(Key, key),
-    LevelKey = {SkipLevel, EK},
-    erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
-
-
-create_value(Key, Val) ->
-    EK = couch_views_encoding:encode(Key),
-    EV = couch_views_encoding:encode(Val),
-    erlfdb_tuple:pack({EK, EV}).
-
-
-add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
-    #{
-        tx := Tx
-    } = TxDb,
-
-    LevelKey = create_key(ReduceIdxPrefix, Level, Key),
-    EV = create_value(Key, Val),
-
-    ok = erlfdb:set(Tx, LevelKey, EV).
-
-
-get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
-    #{
-        tx := Tx
-    } = TxDb,
-
-    % TODO: see if we need to add in conflict ranges for this for level=0
-    Opts = [{limit, 2}, {reverse, true}, {streaming_mode, want_all}],
-%%    LevelPrefix = erlfdb_tuple:pack({Level}, ReduceIdxPrefix),
-
-    EK = couch_views_encoding:encode(Key, key),
-    EndKey0 = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
-
-    {StartKey, EndKey1} = erlfdb_tuple:range({Level}, ReduceIdxPrefix),
-%%    EndKey1 = erlfdb_key:first_greater_than(EndKey0),
-
-    Callback = fun row_cb/2,
-    Out = erlfdb:fold_range(Tx, StartKey, EndKey1, Callback, {val, ReduceIdxPrefix, {}}, Opts),
-    io:format("OUT PRV ~p ~p ~p ~n", [Level, Key, Out]),
-    Out.
-
-
-row_cb({FullEncodedKey, EV}, {val, ReduceIdxPrefix, Acc}) ->
-    io:format("ROW VAL ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
-    {_Level, EK, _VIEW_ROW_VALUE}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    Val = couch_views_encoding:decode(EV),
-%%    io:format("WW ~p ~p ~n", [couch_views_encoding:decode(EK), Val]),
-
-    {key, {EK, ReduceIdxPrefix, Val}};
-
-row_cb({FullEncodedKey, EVK}, {key, {EK, ReduceIdxPrefix, Val}}) ->
-    io:format("ROW KEY ~p ~n", [erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix)]),
-    {_Level, EK, ?VIEW_ROW_KEY}
-        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
-    Key = couch_views_encoding:decode(EVK),
-
-    {Key, Val}.
-
-
-
-
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index bcaaa30..9683265 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,19 +15,273 @@
 
 
 -export([
-%%    write_doc/4
+    fold_level0/6,
+    create_skip_list/3,
+    update_reduce_idx/6
 ]).
 
-% _id keys = {?DB_VIEWS, Sig, ?VIEW_REDUCE_ID_RANGE, DocId, ViewId} = [TotalKeys, TotalSize, UniqueKeys]
 
-%%write_doc(TxDb, Sig, ViewIds, Doc) ->
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
+-define(LEVEL_FAN_POW, 1).
+
+log_levels(Db, Sig, ViewId) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Levels = lists:seq(0, 6),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Opts = [{streaming_mode, want_all}],
+
+    fabric2_fdb:transactional(Db, fun(#{tx := Tx} = TxDb) ->
+        lists:foreach(fun (Level) ->
+            {StartKey, EndKey} = erlfdb_tuple:range({Level},
+                ReduceIdxPrefix),
+
+            Acc0 = #{
+                sig => Sig,
+                view_id => ViewId,
+                reduce_idx_prefix => ReduceIdxPrefix,
+                user_acc => [],
+                callback => fun handle_log_levels/3
+            },
+
+            Fun = fun fold_fwd_cb/2,
+            Acc = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc0, Opts),
+            #{
+                user_acc := Rows
+            } = Acc,
+            io:format("~n LEVEL ~p rows ~p ~n", [Level, Rows])
+        end, Levels)
+    end).
+
+handle_log_levels(Key, Value, Acc) ->
+    Acc ++ [{Key, Value}].
+
+%%fold(Db, Sig, ViewId, Options, Callback, Acc0) ->
 %%    #{
-%%        id := DocId,
-%%        reduce_results := ReduceResults
-%%    } = Doc,
-%%    lists:foreach(fun({ViewId, NewRows}) ->
-%%        % update reduce index
-%%        ok
-%%    end, lists:zip(ViewIds, ReduceResults)).
+%%        db_prefix := DbPrefix
+%%    } = Db,
+%%
+%%%%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
+%%    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+%%    #mrargs{
+%%        limit = Limit
+%%    } = Args,
+%%
+%%    fabric2_fdb:transactional(Db, fun(TxDb) ->
+%%
+%%        Acc0 = #{
+%%            sig => Sig,
+%%            view_id => ViewId,
+%%            user_acc => UserAcc0,
+%%            args => Args,
+%%            callback => UserCallback,
+%%            reduce_idx_prefix => ReduceIdxPrefix,
+%%            limit => Limit,
+%%            row_count => 0
+%%
+%%        },
+%%
+%%        Acc1 = read_level0_only(TxDb, Acc0, Callback),
+%%        #{
+%%            user_acc := UserAcc1
+%%        } = Acc1,
+%%        {ok, UserAcc1}
+%%    end).
+
+fold_level0(Db, Sig, ViewId, Opts, UserCallback, UserAcc0) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+
+    Level = 0,
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Acc = #{
+        sig => Sig,
+        view_id => ViewId,
+        user_acc => UserAcc0,
+        %%            args := Args,
+        callback => UserCallback,
+        reduce_idx_prefix => ReduceIdxPrefix
+    },
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        log_levels(TxDb, Sig, ViewId),
+        #{
+            tx := Tx
+        } = TxDb,
+
+
+%%        {StartKey, EndKey} = erlfdb_tuple:range({0},
+%%            ReduceIdxPrefix),
+        {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+        {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
+%%        ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+        Fun = fun fold_fwd_cb/2,
+        Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
+        #{
+            user_acc := UserAcc1
+        } = Acc1,
+        UserAcc1
+    end).
+
+
+fold_fwd_cb({FullEncodedKey, EV}, Acc) ->
+    #{
+        reduce_idx_prefix := ReduceIdxPrefix,
+        callback := Callback,
+        user_acc := UserAcc
+    } = Acc,
+
+    {_Level, _EK}
+        = erlfdb_tuple:unpack(FullEncodedKey, ReduceIdxPrefix),
+    {EK, EV1} = erlfdb_tuple:unpack(EV),
+    Key = couch_views_encoding:decode(EK),
+    Val = couch_views_encoding:decode(EV1),
+
+    UserAcc1 = Callback(Key, Val, UserAcc),
+    Acc#{user_acc := UserAcc1}.
+
+
+reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ViewOpts = #{
+        db_prefix => DbPrefix,
+        sig => Sig,
+        view_id => ViewId
+    },
+
+    lists:foreach(fun ({Key, Val}) ->
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+    end, ReduceResult).
+
+
+create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+
+        lists:foreach(fun(Level) ->
+            add_kv(TxDb, ReduceIdxPrefix, Level, 0, 0)
+        end, Levels)
+    end).
+
+
+add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+    #{
+        db_prefix := DbPrefix,
+        sig := Sig,
+        view_id := ViewId
+    } = ViewOpts,
+
+    Levels = lists:seq(0, MaxLevel),
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    KeyHash = hash_key(Key),
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foreach(fun(Level) ->
+            {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
+            io:format("Process ~p ~p ~p PREV VALS ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
+            case should_add_key_to_level(Level, KeyHash) of
+                true ->
+                    io:format("Adding ~p ~p ~n", [Level, Key]),
+                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+                false ->
+                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
+                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, NewVal)
+            end
+        end, Levels)
+    end).
+
+
+get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    % TODO: see if we need to add in conflict ranges for this for level=0
+    Opts = [{limit, 1}, {streaming_mode, want_all}],
+
+    EK = couch_views_encoding:encode(Key, key),
+    StartKey = erlfdb_tuple:pack({Level, EK}, ReduceIdxPrefix),
+    StartKeySel = erlfdb_key:last_less_or_equal(StartKey),
+    EndKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+
+    Future = erlfdb:get_range(Tx, StartKeySel, EndKeySel, Opts),
+    [{_FullEncodedKey, PackedValue}] = erlfdb:wait(Future),
+    get_key_value(PackedValue).
+
+
+hash_key(Key) ->
+    erlang:phash2(Key).
+
+
+should_add_key_to_level(Level, KeyHash) ->
+    (KeyHash band ((1 bsl (Level * ?LEVEL_FAN_POW)) -1)) == 0.
+%%    keyHash & ((1 << (level * LEVEL_FAN_POW)) - 1)) != 0
+
+
+create_key(ReduceIdxPrefix, SkipLevel, Key) ->
+    EK = couch_views_encoding:encode(Key, key),
+    LevelKey = {SkipLevel, EK},
+    erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
+create_value(Key, Val) ->
+    EK = couch_views_encoding:encode(Key),
+    EV = couch_views_encoding:encode(Val),
+    erlfdb_tuple:pack({EK, EV}).
+
+
+get_key_value(PackedValue) ->
+    {EncodedKey, EncodedValue}
+        = erlfdb_tuple:unpack(PackedValue),
+    Key = couch_views_encoding:decode(EncodedKey),
+    Value = couch_views_encoding:decode(EncodedValue),
+    {Key, Value}.
+
+
+add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    LevelKey = create_key(ReduceIdxPrefix, Level, Key),
+    EV = create_value(Key, Val),
+
+    ok = erlfdb:set(Tx, LevelKey, EV).
+
+
+rereduce(<<"_stats">>, {PrevKey, PrevVal}, {_Key, Val}) ->
+    case PrevVal >= Val of
+        true -> {PrevKey, PrevVal};
+        false -> {PrevKey, Val}
+    end.
+
 
 
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index a526658..c1b35e2 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,29 +40,37 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-  test "group=true count reduce", context do
-    args = %{
-      :reduce => true,
-      :group => true
-      #            :limit => 9
-    }
+#  test "group=true count reduce with limit", context do
+#    args = %{
+#      :reduce => true,
+#      :group => true,
+#      :limit => 3
+#    }
+#
+#    {:ok, res} = run_query(context, args, "dates")
+#    IO.inspect(res, label: "OUT")
+#
+#    assert res == [
+#             {:row, [key: [2017, 3, 1], value: 1]},
+#             {:row, [key: [2017, 4, 1], value: 1]},
+#             {:row, [key: [2017, 4, 15], value: 1]}
+#           ]
+#  end
+
+  test "group_level=1 count reduce", context do
+      args = %{
+          :reduce => true,
+          :group => true,
+      }
 
-    {:ok, res} = run_query(context, args, "baz")
-    IO.inspect(res, label: "OUT")
+      {:ok, res} = run_query(context, args, "dates")
+      IO.inspect(res, label: "OUT")
 
-    assert res == [
-             {:row, [key: 1, value: 2]},
-             {:row, [key: 2, value: 2]},
-             {:row, [key: 3, value: 2]},
-             {:row, [key: [1, 1], value: 1]},
-             {:row, [key: [1, 1, 5], value: 1]},
-             {:row, [key: [1, 2, 6], value: 1]},
-             {:row, [key: [2, 1], value: 1]},
-             {:row, [key: [2, 3, 6], value: 1]},
-             {:row, [key: [3, 1], value: 1]},
-             {:row, [key: [3, 1, 5], value: 1]},
-             {:row, [key: [3, 4, 5], value: 1]}
-           ]
+      assert res == [
+                 {:row, [key: [2017], value: 1]},
+                 {:row, [key: [2018], value: 1]},
+                 {:row, [key: [2019], value: 1]}
+             ]
   end
 
   #  test "group=1 count reduce", context do
@@ -173,6 +181,7 @@ defmodule CouchViewsReduceTest do
   end
 
   def default_cb(:complete, acc) do
+    IO.inspect(acc, label: "complete")
     {:ok, Enum.reverse(acc)}
   end
 
@@ -197,7 +206,22 @@ defmodule CouchViewsReduceTest do
   end
 
   defp create_docs() do
-    for i <- 1..1 do
+    dates = [
+      [2017, 3, 1],
+      [2017, 4, 1],
+      # out of order check
+      [2019, 3, 1],
+      [2017, 4, 15],
+      [2018, 4, 1],
+      [2017, 5, 1],
+      [2018, 3, 1],
+      # duplicate check
+      [2018, 4, 1],
+      [2018, 5, 1],
+      [2019, 4, 1]
+    ]
+
+    for i <- 1..4 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -205,14 +229,14 @@ defmodule CouchViewsReduceTest do
           "second"
         end
 
-      :couch_doc.from_json_obj(
-        {[
-           {"_id", "doc-id-#{i}"},
-           {"value", i},
-           {"some", "field"},
-           {"group", group}
-         ]}
-      )
+      :couch_doc.from_json_obj({[
+         {"_id", "doc-id-#{i}"},
+         {"value", i},
+         {"some", "field"},
+         {"group", group},
+         {"date", Enum.at(dates, i - 1)}
+         #           {"timestamp", Enum.at(timestamps, i - 1)}
+       ]})
     end
   end
 
@@ -221,6 +245,16 @@ defmodule CouchViewsReduceTest do
        {"_id", "_design/bar"},
        {"views",
         {[
+           {"dates",
+            {[
+               {"map",
+                """
+                function(doc) {
+                  emit(doc.date, doc.value);
+                 }
+                """},
+               {"reduce", "_count"}
+             ]}},
            {"baz",
             {[
                {"map",


[couchdb] 05/08: level 0 _sum working

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6f6bedc710df762bf3cc490ea0b8edbc38b4122f
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Oct 31 16:42:36 2019 +0200

    level 0 _sum working
---
 src/couch_views/src/couch_views.erl.orig           | 159 ---------------------
 src/couch_views/src/couch_views_fdb.erl            |   8 +-
 src/couch_views/src/couch_views_indexer.erl        |   7 +-
 src/couch_views/src/couch_views_reduce.erl         |  12 ++
 src/couch_views/src/couch_views_reduce_fdb.erl     |  54 ++++++-
 .../test/exunit/couch_views_reduce_test.exs        | 150 +++++++++++--------
 6 files changed, 160 insertions(+), 230 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl.orig b/src/couch_views/src/couch_views.erl.orig
deleted file mode 100644
index 1830076..0000000
--- a/src/couch_views/src/couch_views.erl.orig
+++ /dev/null
@@ -1,159 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_views).
-
--export([
-    query/6
-]).
-
-
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
-
-query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
-    case fabric2_db:is_users_db(Db) of
-        true ->
-            fabric2_users_db:after_doc_read(DDoc, Db);
-        false ->
-            ok
-    end,
-
-    DbName = fabric2_db:name(Db),
-    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
-
-    #mrst{
-        views = Views
-    } = Mrst,
-
-    Args1 = to_mrargs(Args0),
-    Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
-    Args3 = couch_mrview_util:validate_args(Args2),
-
-    ok = check_range(Args3),
-
-    try
-<<<<<<< HEAD
-        fabric2_fdb:transactional(Db, fun(TxDb) ->
-            ok = maybe_update_view(TxDb, Mrst, Args3),
-            read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
-        end)
-    catch throw:{build_view, WaitSeq} ->
-        couch_views_jobs:build_view(Db, Mrst, WaitSeq),
-        read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
-=======
-        case is_reduce_view(Args3) of
-            true ->
-                couch_views_reader:read_reduce(Db, Mrst, ViewName,
-                    Callback, Acc0, Args3);
-            false ->
-                couch_views_reader:read(Db, Mrst, ViewName,
-                    Callback, Acc0, Args3)
-        end
-    after
-        UpdateAfter = Args3#mrargs.update == lazy,
-        if UpdateAfter == false -> ok; true ->
-            couch_views_jobs:build_view_async(Db, Mrst)
-        end
->>>>>>> Initial work
-    end.
-
-
-read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        try
-            couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
-        after
-            UpdateAfter = Args#mrargs.update == lazy,
-            if UpdateAfter == false -> ok; true ->
-                couch_views_jobs:build_view_async(TxDb, Mrst)
-            end
-        end
-    end).
-
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
-    ok;
-
-maybe_update_view(_Db, _Mrst, #mrargs{update = lazy}) ->
-    ok;
-
-maybe_update_view(TxDb, Mrst, _Args) ->
-    DbSeq = fabric2_db:get_update_seq(TxDb),
-    ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
-    case DbSeq == ViewSeq of
-        true -> ok;
-        false -> throw({build_view, DbSeq})
-    end.
-
-
-is_reduce_view(#mrargs{view_type = ViewType}) ->
-    ViewType =:= red;
-is_reduce_view({Reduce, _, _}) ->
-    Reduce =:= red.
-
-
-to_mrargs(#mrargs{} = Args) ->
-    Args;
-
-to_mrargs(#{} = Args) ->
-    Fields = record_info(fields, mrargs),
-    Indexes = lists:seq(2, record_info(size, mrargs)),
-    LU = lists:zip(Fields, Indexes),
-
-    maps:fold(fun(Key, Value, Acc) ->
-        Index = fabric2_util:get_value(couch_util:to_existing_atom(Key), LU),
-        setelement(Index, Acc, Value)
-    end, #mrargs{}, Args).
-
-
-check_range(#mrargs{start_key = undefined}) ->
-    ok;
-
-check_range(#mrargs{end_key = undefined}) ->
-    ok;
-
-check_range(#mrargs{start_key = K, end_key = K}) ->
-    ok;
-
-check_range(Args) ->
-    #mrargs{
-        direction = Dir,
-        start_key = SK,
-        start_key_docid = SKD,
-        end_key = EK,
-        end_key_docid = EKD
-    } = Args,
-
-    case {Dir, view_cmp(SK, SKD, EK, EKD)} of
-        {fwd, false} ->
-            throw(check_range_error(<<"true">>));
-        {rev, true} ->
-            throw(check_range_error(<<"false">>));
-        _ ->
-            ok
-    end.
-
-
-check_range_error(Descending) ->
-    {query_parse_error,
-        <<"No rows can match your key range, reverse your ",
-            "start_key and end_key or set descending=",
-            Descending/binary>>}.
-
-
-view_cmp(SK, SKD, EK, EKD) ->
-    BinSK = couch_views_encoding:encode(SK, key),
-    BinEK = couch_views_encoding:encode(EK, key),
-    PackedSK = erlfdb_tuple:pack({BinSK, SKD}),
-    PackedEK = erlfdb_tuple:pack({BinEK, EKD}),
-    PackedSK =< PackedEK.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 8999d76..6c81457 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -148,7 +148,7 @@ write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
         update_kv_size(TxDb, Sig, ViewId, -TotalSize)
     end, ExistingViewKeys);
 
-write_doc(TxDb, Sig, ViewIds, Doc) ->
+write_doc(TxDb, Sig, ViewsIdFun, Doc) ->
     #{
         id := DocId,
         results := Results,
@@ -161,7 +161,7 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
 
     %% TODO: handle when there is no reduce
     io:format("REDUCE RESULTS ~p ~n", [ReduceResults]),
-    lists:foreach(fun({ViewId, NewRows, ReduceResult}) ->
+    lists:foreach(fun({{ViewId, Reducer}, NewRows, ReduceResult}) ->
         update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
 
         ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
@@ -179,9 +179,9 @@ write_doc(TxDb, Sig, ViewIds, Doc) ->
                 []
         end,
         update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows),
-        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, DocId,
+        couch_views_reduce_fdb:update_reduce_idx(TxDb, Sig, ViewId, Reducer, DocId,
             ExistingKeys, ReduceResult)
-    end, lists:zip3(ViewIds, Results, ReduceResults)).
+    end, lists:zip3(ViewsIdFun, Results, ReduceResults)).
 
 
 % For each row in a map view there are two rows stored in
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index cff15b0..51fae9b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -303,6 +303,11 @@ write_docs(TxDb, Mrst, Docs, State) ->
     } = State,
 
     ViewIds = [View#mrview.id_num || View <- Views],
+    ViewsIdFuns = lists:foldl(fun (View, Acc) ->
+        Id = View#mrview.id_num,
+        [{_Name, ReduceFun}] = View#mrview.reduce_funs,
+        Acc ++ [{Id, ReduceFun}]
+    end, [], Views),
 
     %%  First build of the view
     if ViewSeq /= <<>> -> ok; true ->
@@ -310,7 +315,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
     end,
 
     lists:foreach(fun(Doc) ->
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+        couch_views_fdb:write_doc(TxDb, Sig, ViewsIdFuns, Doc)
     end, Docs),
 
     couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index b7eb18e..04c5cb8 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -191,6 +191,18 @@ reduce(<<"_count">>, Results) ->
     end, #{}, Results),
     maps:to_list(ReduceResults);
 
+reduce(<<"_sum">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Sum} = Acc,
+                Acc#{Key := Val + Sum};
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
 % this isn't a real supported reduce function in CouchDB
 % But I want a basic reduce function that when we need to update the index
 % we would need to re-read multiple rows instead of being able to do an
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 5759c42..7a7e120 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -17,7 +17,7 @@
 -export([
     fold_level0/8,
     create_skip_list/3,
-    update_reduce_idx/6
+    update_reduce_idx/7
 ]).
 
 
@@ -188,6 +188,14 @@ rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
     GroupKey = group_level_key(Key, GroupLevel),
     {GroupKey, Val};
 
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+    Sum = lists:foldl(fun ({_, Val}, Acc) ->
+       Val + Acc
+    end, 0, Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Sum};
+
 rereduce(<<"_count">>, Rows, GroupLevel) ->
     Val = length(Rows),
     {Key, _} = hd(Rows),
@@ -210,6 +218,9 @@ group_level_equal(One, Two, GroupLevel) ->
 group_level_key(_Key, 0) ->
     null;
 
+group_level_key(Key, group_true) ->
+    Key;
+
 group_level_key(Key, GroupLevel) when is_list(Key) ->
     lists:sublist(Key, GroupLevel);
 
@@ -230,7 +241,7 @@ unpack_key_value(EncodedValue) ->
 
 
 %% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
     #{
         db_prefix := DbPrefix
     } = TxDb,
@@ -238,7 +249,8 @@ update_reduce_idx(TxDb, Sig, ViewId, _DocId, _ExistingKeys, ReduceResult) ->
     ViewOpts = #{
         db_prefix => DbPrefix,
         sig => Sig,
-        view_id => ViewId
+        view_id => ViewId,
+        reducer => Reducer
     },
 
     lists:foreach(fun ({Key, Val}) ->
@@ -269,29 +281,57 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
     #{
         db_prefix := DbPrefix,
         sig := Sig,
-        view_id := ViewId
+        view_id := ViewId,
+        reducer := Reducer
     } = ViewOpts,
 
-    Levels = lists:seq(0, MaxLevel),
+    Levels = lists:seq(1, MaxLevel),
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     KeyHash = hash_key(Key),
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
+        Val1 = case get_value(TxDb, ReduceIdxPrefix, 0, Key) of
+            not_found ->
+                Val;
+            ExistingVal ->
+                {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+                ReducedVal
+        end,
+        io:format("VAL1 ~p ~n", [Val1]),
+        add_kv(TxDb, ReduceIdxPrefix, 0, Key, Val1),
+
         lists:foreach(fun(Level) ->
             {PrevKey, PrevVal} = get_previous_key(TxDb, ReduceIdxPrefix, Level, Key),
             io:format("Level ~p K/V ~p ~p PREV KV ~p ~p ~n", [Level, Key, Val, PrevKey, PrevVal]),
             case should_add_key_to_level(Level, KeyHash) of
                 true ->
                     io:format("Adding at ~p ~p ~n", [Level, Key]),
-                    add_kv(Db, ReduceIdxPrefix, Level, Key, Val);
+                    add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
                 false ->
 %%                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
 %%                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(Db, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+                    add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
             end
         end, Levels)
     end).
 
+get_value(TxDb, ReduceIdxPrefix, Level, Key) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    EK = create_key(ReduceIdxPrefix, Level, Key),
+    io:format("FF ~p ~n", [Key]),
+    Out = case erlfdb:wait(erlfdb:get(Tx, EK)) of
+        not_found ->
+            not_found;
+        PackedValue ->
+            io:format("HERE ~p ~n", [PackedValue]),
+            {_, Value} = get_key_value(PackedValue),
+            Value
+    end,
+    io:format("GETTING ~p ~p ~n", [Key, Out]),
+    Out.
+
 
 get_previous_key(TxDb, ReduceIdxPrefix, Level, Key) ->
     #{
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index 488f3ee..0812f1f 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -40,37 +40,53 @@ defmodule CouchViewsReduceTest do
     }
   end
 
-#  test "group=true count reduce with limit", context do
+  #  test "group=true count reduce with limit", context do
+  #    args = %{
+  #      :reduce => true,
+  #      :group => true,
+  #      :limit => 3
+  #    }
+  #
+  #    {:ok, res} = run_query(context, args, "dates")
+  #    IO.inspect(res, label: "OUT")
+  #
+  #    assert res == [
+  #             {:row, [key: [2017, 3, 1], value: 1]},
+  #             {:row, [key: [2017, 4, 1], value: 1]},
+  #             {:row, [key: [2017, 4, 15], value: 1]}
+  #           ]
+  #  end
+
+#  test "group_level=1 count reduce", context do
 #    args = %{
 #      :reduce => true,
-#      :group => true,
-#      :limit => 3
+#      :group_level => 1
 #    }
 #
-#    {:ok, res} = run_query(context, args, "dates")
+#    {:ok, res} = run_query(context, args, "dates_count")
 #    IO.inspect(res, label: "OUT")
 #
 #    assert res == [
-#             {:row, [key: [2017, 3, 1], value: 1]},
-#             {:row, [key: [2017, 4, 1], value: 1]},
-#             {:row, [key: [2017, 4, 15], value: 1]}
+#             {:row, [key: [2017], value: 4]},
+#             {:row, [key: [2018], value: 3]},
+#             {:row, [key: [2019], value: 2]}
 #           ]
 #  end
 
-  test "group_level=1 count reduce", context do
-      args = %{
-          :reduce => true,
-          :group_level => 1,
-      }
+  test "group_level=1 reduce reduce", context do
+    args = %{
+      :reduce => true,
+      :group_level => 1
+    }
 
-      {:ok, res} = run_query(context, args, "dates")
-      IO.inspect(res, label: "OUT")
+    {:ok, res} = run_query(context, args, "dates_sum")
+    IO.inspect(res, label: "OUT")
 
-      assert res == [
-                 {:row, [key: [2017], value: 4]},
-                 {:row, [key: [2018], value: 3]},
-                 {:row, [key: [2019], value: 2]}
-             ]
+    assert res == [
+             {:row, [key: [2017], value: 31]},
+             {:row, [key: [2018], value: 20]},
+             {:row, [key: [2019], value: 17]}
+           ]
   end
 
   #  test "group=1 count reduce", context do
@@ -207,21 +223,22 @@ defmodule CouchViewsReduceTest do
 
   defp create_docs() do
     dates = [
-      [2017, 3, 1],
-      [2017, 4, 1],
+      {[2017, 3, 1], 9},
+      {[2017, 4, 1], 7},
       # out of order check
-      [2019, 3, 1],
-      [2017, 4, 15],
-      [2018, 4, 1],
-      [2017, 5, 1],
-      [2018, 3, 1],
+      {[2019, 3, 1], 4},
+      {[2017, 4, 15], 6},
+      {[2018, 4, 1], 3},
+      {[2017, 5, 1], 9},
+      {[2018, 3, 1], 6},
       # duplicate check
-      [2018, 4, 1],
-      [2018, 5, 1],
-      [2019, 4, 1]
+      {[2018, 4, 1], 4},
+      {[2018, 5, 1], 7},
+      {[2019, 4, 1], 6},
+      {[2019, 5, 1], 7}
     ]
 
-    for i <- 1..10 do
+    for i <- 1..11 do
       group =
         if rem(i, 3) == 0 do
           "first"
@@ -229,13 +246,18 @@ defmodule CouchViewsReduceTest do
           "second"
         end
 
-      :couch_doc.from_json_obj({[
-         {"_id", "doc-id-#{i}"},
-         {"value", i},
-         {"some", "field"},
-         {"group", group},
-         {"date", Enum.at(dates, i - 1)}
-       ]})
+      {date_key, date_val} = Enum.at(dates, i - 1)
+
+      :couch_doc.from_json_obj(
+        {[
+           {"_id", "doc-id-#{i}"},
+           {"value", i},
+           {"some", "field"},
+           {"group", group},
+           {"date", date_key},
+           {"date_val", date_val}
+         ]}
+      )
     end
   end
 
@@ -244,34 +266,44 @@ defmodule CouchViewsReduceTest do
        {"_id", "_design/bar"},
        {"views",
         {[
-           {"dates",
+           #           {"dates_count",
+           #            {[
+           #               {"map",
+           #                """
+           #                function(doc) {
+           #                  emit(doc.date, doc.value);
+           #                 }
+           #                """},
+           #               {"reduce", "_count"}
+           #             ]}}
+           {"dates_sum",
             {[
                {"map",
                 """
                 function(doc) {
-                  emit(doc.date, doc.value);
-                 }
+                    emit(doc.date, doc.date_val);
+                }
                 """},
-               {"reduce", "_count"}
+               {"reduce", "_sum"}
              ]}}
-#           {"baz",
-#            {[
-#               {"map",
-#                """
-#                function(doc) {
-#                  emit(doc.value, doc.value);
-#                  emit(doc.value, doc.value);
-#                  emit([doc.value, 1], doc.value);
-#                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
-#
-#                  if (doc.value === 3) {
-#                    emit([1, 1, 5], 1);
-#                    emit([doc.value, 1, 5], 1);
-#                  }
-#                 }
-#                """},
-#               {"reduce", "_count"}
-#             ]}}
+           #           {"baz",
+           #            {[
+           #               {"map",
+           #                """
+           #                function(doc) {
+           #                  emit(doc.value, doc.value);
+           #                  emit(doc.value, doc.value);
+           #                  emit([doc.value, 1], doc.value);
+           #                  emit([doc.value, doc.value + 1, doc.group.length], doc.value);
+           #
+           #                  if (doc.value === 3) {
+           #                    emit([1, 1, 5], 1);
+           #                    emit([doc.value, 1, 5], 1);
+           #                  }
+           #                 }
+           #                """},
+           #               {"reduce", "_count"}
+           #             ]}}
            #             {"boom",
            #              {[
            #                 {"map",


[couchdb] 07/08: basic skiplist query working

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/builtin-reduce
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f01b653e1f8fc0f8be8837e5217ad0f5b9e8fdad
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Thu Dec 5 15:42:52 2019 +0200

    basic skiplist query working
---
 src/couch_views/src/couch_views_indexer.erl        |   2 +-
 src/couch_views/src/couch_views_reduce.erl         | 117 +++----
 src/couch_views/src/couch_views_reduce_fdb.erl     | 355 ++++++++++++++++-----
 src/couch_views/src/couch_views_reducer.erl        | 119 +++++++
 .../test/exunit/couch_views_reduce_test.exs        |  49 ++-
 5 files changed, 447 insertions(+), 195 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 51fae9b..096d838 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -311,7 +311,7 @@ write_docs(TxDb, Mrst, Docs, State) ->
 
     %%  First build of the view
     if ViewSeq /= <<>> -> ok; true ->
-        couch_views_reduce:setup_reduce_indexes(TxDb, Sig, ViewIds)
+        couch_views_reduce_fdb:create_reduce_indexes(TxDb, Sig, ViewIds)
     end,
 
     lists:foreach(fun(Doc) ->
diff --git a/src/couch_views/src/couch_views_reduce.erl b/src/couch_views/src/couch_views_reduce.erl
index a2e3a93..0e837e3 100644
--- a/src/couch_views/src/couch_views_reduce.erl
+++ b/src/couch_views/src/couch_views_reduce.erl
@@ -15,8 +15,7 @@
 
 -export([
     run_reduce/2,
-    read_reduce/7,
-    setup_reduce_indexes/3
+    read_reduce/7
 ]).
 
 
@@ -36,7 +35,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
     } = Db,
 
 %%    Levels = lists:seq(0, ?MAX_SKIP_LIST_LEVELS),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    ReduceIdxPrefix = couch_views_reduce_fdb:reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     #mrargs{
         limit = Limit,
         group = Group,
@@ -48,7 +47,7 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
         _ -> GroupLevel
     end,
 
-    Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
+%%    Opts = args_to_fdb_opts(Args, ReduceIdxPrefix),
 
     try
         fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -66,7 +65,10 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
             },
 
             Fun = fun handle_row/3,
-            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+%%            Acc1 = couch_views_reduce_fdb:fold_level0(TxDb, Sig, ViewId, Reducer, GroupLevel1, Opts, Fun, Acc0),
+
+            SkipListOpts = args_to_skiplist_opts(Args),
+            Acc1 = couch_views_reduce_fdb:fold_skip_list(TxDb, Sig, ViewId, Reducer, GroupLevel1, SkipListOpts, Fun, Acc0),
             #{
                 user_acc := UserAcc1
             } = Acc1,
@@ -76,6 +78,30 @@ read_reduce(Db, Sig, ViewId, Reducer, UserCallback, UserAcc0, Args) ->
         {ok, Out}
     end.
 
+args_to_skiplist_opts(#mrargs{} = Args) ->
+    #mrargs{
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+
+    StartKey1 = case StartKey of
+        undefined ->
+            [0];
+        StartKey ->
+            StartKey
+    end,
+
+    EndKey1 = case EndKey of
+        undefined ->
+            throw(no_end_key_not_working_yet_error);
+        EndKey ->
+            EndKey
+    end,
+    #{
+        startkey => StartKey1,
+        endkey => EndKey1
+    }.
+
 
 args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
     #mrargs{
@@ -85,10 +111,10 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
 
     StartKey1 = case StartKey of
         undefined ->
-            StartKey0 = encode_key(0, 0, ReduceIdxPrefix),
+            StartKey0 = couch_views_reduce_fdb:create_key(0, 0, ReduceIdxPrefix),
             erlfdb_key:first_greater_than(StartKey0);
         StartKey ->
-            StartKey0 = encode_key(StartKey, 0, ReduceIdxPrefix),
+            StartKey0 = couch_views_reduce_fdb:create_key(StartKey, 0, ReduceIdxPrefix),
             erlfdb_key:first_greater_or_equal(StartKey0)
     end,
 
@@ -99,17 +125,12 @@ args_to_fdb_opts(#mrargs{} = Args, ReduceIdxPrefix) ->
             EndKey0;
         EndKey ->
             io:format("ENDKEY ~n"),
-            EndKey0 = encode_key(EndKey, 0, ReduceIdxPrefix),
+            EndKey0 = couch_views_reduce_fdb:create_key(EndKey, 0, ReduceIdxPrefix),
             erlfdb_key:first_greater_than(EndKey0)
     end,
     [{streaming_mode, want_all}, {startkey, StartKey1}, {endkey, EndKey1}].
 
 
-encode_key(Key, Level, ReduceIdxPrefix) ->
-    EK = {Level, couch_views_encoding:encode(Key, key)},
-    erlfdb_tuple:pack(EK, ReduceIdxPrefix).
-
-
 handle_row(Key, Value, Acc) ->
     #{
         callback := UserCallback,
@@ -118,6 +139,7 @@ handle_row(Key, Value, Acc) ->
         limit := Limit
     } = Acc,
 
+    io:format("WOO ROW ~p ~p ~n", [Key, Value]),
     Row = [
         {key, Key},
         {value, Value}
@@ -140,23 +162,6 @@ handle_row(Key, Value, Acc) ->
 maybe_stop({ok, Acc}) -> Acc;
 maybe_stop({stop, Acc}) -> throw({done, Acc}).
 
-setup_reduce_indexes(Db, Sig, ViewIds) ->
-    #{
-        db_prefix := DbPrefix
-    } = Db,
-
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        lists:foreach(fun (ViewId) ->
-            ViewOpts = #{
-                db_prefix => DbPrefix,
-                sig => Sig,
-                view_id => ViewId
-            },
-            couch_views_reduce_fdb:create_skip_list(TxDb,
-                ?MAX_SKIP_LIST_LEVELS, ViewOpts)
-        end, ViewIds)
-    end).
-
 
 run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
     ReduceFuns = lists:map(fun(View) ->
@@ -175,7 +180,7 @@ run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
         } = MappedResult,
 
         ReduceResults = lists:map(fun ({ReduceFun, Result}) ->
-            reduce(ReduceFun, Result)
+            couch_views_reducer:reduce(ReduceFun, Result)
         end, lists:zip(ReduceFuns, Results)),
 
         MappedResult#{
@@ -184,59 +189,9 @@ run_reduce(#mrst{views = Views } = Mrst, MappedResults) ->
     end, MappedResults).
 
 
-reduce(<<"_count">>, Results) ->
-    ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
-        case maps:is_key(Key, Acc) of
-            true ->
-                #{Key := Val} = Acc,
-                Acc#{Key := Val + 1};
-            false ->
-                Acc#{Key => 1}
-        end
-    end, #{}, Results),
-    maps:to_list(ReduceResults);
-
-reduce(<<"_sum">>, Results) ->
-    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
-        case maps:is_key(Key, Acc) of
-            true ->
-                #{Key := Sum} = Acc,
-                Acc#{Key := Val + Sum};
-            false ->
-                Acc#{Key => Val}
-        end
-    end, #{}, Results),
-    maps:to_list(ReduceResults);
-
-% this isn't a real supported reduce function in CouchDB
-% But I want a basic reduce function that when we need to update the index
-% we would need to re-read multiple rows instead of being able to do an
-% atomic update
-reduce(<<"_stats">>, Results) ->
-    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
-        io:format("MAX ~p ~p ~n", [Key, Val]),
-        case maps:is_key(Key, Acc) of
-            true ->
-                #{Key := Max} = Acc,
-                case Max >= Val of
-                    true ->
-                        Acc;
-                    false ->
-                        Acc#{Key := Val}
-                end;
-            false ->
-                Acc#{Key => Val}
-        end
-    end, #{}, Results),
-    maps:to_list(ReduceResults).
-
-
 is_builtin(<<"_", _/binary>>) ->
     true;
 
 is_builtin(_) ->
     false.
 
-reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
-    Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
-    erlfdb_tuple:pack(Key, DbPrefix).
diff --git a/src/couch_views/src/couch_views_reduce_fdb.erl b/src/couch_views/src/couch_views_reduce_fdb.erl
index 77514ed..72e05b8 100644
--- a/src/couch_views/src/couch_views_reduce_fdb.erl
+++ b/src/couch_views/src/couch_views_reduce_fdb.erl
@@ -15,9 +15,13 @@
 
 
 -export([
+    fold_skip_list/8,
     fold_level0/8,
+    create_reduce_indexes/3,
     create_skip_list/3,
-    update_reduce_idx/7
+    update_reduce_idx/7,
+    reduce_skip_list_idx_prefix/3,
+    create_key/3
 ]).
 
 
@@ -26,9 +30,11 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/include/fabric2.hrl").
 
--define(MAX_SKIP_LIST_LEVELS, 1).
+
+-define(MAX_SKIP_LIST_LEVELS, 6).
 -define(LEVEL_FAN_POW, 1).
 
+
 log_levels(Db, Sig, ViewId) ->
     #{
         db_prefix := DbPrefix
@@ -55,8 +61,8 @@ log_levels(Db, Sig, ViewId) ->
                 false ->
                     Total = sum_rows(Rows),
                     if Total == Level0Total -> Level0Total; true ->
-                        io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total])
-%%                        throw(level_total_error)
+                        io:format("~n ~n LEVEL ~p NOT EQUAL ~p ~p ~n", [Level, Level0Total, Total]),
+                        throw(level_total_error)
                     end
             end
 
@@ -69,18 +75,234 @@ sum_rows(Rows) ->
     end, 0, Rows).
 
 
+fold_skip_list(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+%%    timer:exit_after(40, boom),
+
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+    Acc = #{
+        sig => Sig,
+        view_id => ViewId,
+        user_acc => UserAcc0,
+        callback => UserCallback,
+        reduce_idx_prefix => ReduceIdxPrefix,
+        reducer => Reducer,
+        group_level => GroupLevel,
+        rows => []
+    },
+    #{
+        startkey := StartKey,
+        endkey := EndKey
+    } = Opts,
+
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        log_levels(TxDb, Sig, ViewId),
+        Acc1 = traverse_skip_list(TxDb, 0, StartKey, EndKey, Acc),
+        #{
+            user_acc := UserAcc1,
+            rows := Rows1
+        } = Acc1,
+        rereduce_and_reply(Reducer, Rows1, GroupLevel, UserCallback, UserAcc1)
+    end).
+
+traverse_skip_list(_TxDb, Level, _CurrentKey, _EndKey, _Acc) when Level < 0 ->
+ throw(skip_list_gone_to_low);
+
+traverse_skip_list(TxDb, _Level, CurrentKey, EndKey, Acc) ->
+    #{
+        user_acc := UserAcc,
+        callback := UserCallback,
+        reduce_idx_prefix := ReduceIdxPrefix,
+        reducer := Reducer,
+        group_level := GroupLevel,
+        rows := Rows
+    } = Acc,
+
+    {RangeLevel, RangeStart, RangeEnd} = get_next_range_and_level(TxDb,
+        ReduceIdxPrefix, GroupLevel, CurrentKey, EndKey),
+    io:format("TRAVERSE Level ~p RangeStart ~p RangeEnd ~p ~n", [RangeLevel, RangeStart, RangeEnd]),
+    Results = get_range_inclusive(TxDb, RangeStart, RangeEnd, RangeLevel, ReduceIdxPrefix),
+    io:format("RESULTS ~p ~n", [Results]),
+
+    NextStart = if Results == [] -> null; true ->
+        {LastKey, _} = lists:last(Results),
+        LastKey
+    end,
+    KeyAfterStart = get_key_after(TxDb, NextStart, EndKey, RangeLevel, ReduceIdxPrefix),
+    io:format("NEXTStart ~p KeyAfter ~p RangeLevel ~p ~n", [NextStart, KeyAfterStart, RangeLevel]),
+
+    {NextStart1, Acc1} = case couch_views_reducer:group_level_equal(NextStart, KeyAfterStart, GroupLevel) of
+        true ->
+            AccNext = Acc#{rows := Rows ++ Results},
+            {NextStart, AccNext};
+        false when RangeLevel == 0 ->
+            AllResults = Rows ++ Results,
+            UserAcc1 = rereduce_and_reply(Reducer, AllResults, GroupLevel,
+                UserCallback, UserAcc),
+            AccNext = Acc#{
+                user_acc := UserAcc1,
+                rows := []
+            },
+            {KeyAfterStart, AccNext};
+        % Need to traverse at level 0 to make sure we have all keys for
+        % the current group_level keys
+        false ->
+            UsableResults = lists:sublist(Results, length(Results) - 1),
+            io:format("USABLE ~p LEVEL ~p ~n", [UsableResults, RangeLevel]),
+            AccNext = Acc#{rows := Rows ++ UsableResults},
+            {NextStart, AccNext}
+    end,
+
+    case RangeEnd == EndKey orelse NextStart1 == null of
+        true when RangeLevel == 0 ->
+            io:format("FINISED ~n"),
+            Acc1;
+        _ ->
+            traverse_skip_list(TxDb, 0, NextStart1, EndKey, Acc1)
+    end.
+
+
+
+get_next_range_and_level(TxDb, ReduceIdxPrefix, GroupLevel, StartKey, EndKey) ->
+    GroupEndKey = get_group_level_endkey(TxDb, GroupLevel, 0, StartKey, ReduceIdxPrefix),
+    % Do not exceed the set endkey
+    GroupEndKey1 = if GroupEndKey < EndKey -> GroupEndKey; true -> EndKey end,
+
+    LevelRanges = [{0, StartKey, GroupEndKey1}],
+    io:format("Get Range StartKey ~p GroupEndKey ~p EndKey ~p ~n", [StartKey, GroupEndKey1, EndKey]),
+    LevelRanges1 = scan_for_level_ranges(TxDb, 0, GroupLevel, StartKey, GroupEndKey1, ReduceIdxPrefix, LevelRanges),
+    lists:last(LevelRanges1).
+
+
+% at end of this specific grouplevel, so have to do final scan at level 0
+scan_for_level_ranges(_TxDb, _Level, _GroupLevel, StartKey, StartKey, _ReduceIdxPrefix, _Acc) ->
+    [{0, StartKey, StartKey}];
+
+scan_for_level_ranges(_TxDb, ?MAX_SKIP_LIST_LEVELS, _GroupLevel, StartKey, StartKey, _ReduceIdxPrefix, Acc) ->
+    Acc;
+
+scan_for_level_ranges(TxDb, Level, GroupLevel, StartKey, EndKey, ReduceIdxPrefix, Acc) ->
+    NextLevel = Level + 1,
+    NearestKey = get_key_or_nearest(TxDb, NextLevel, StartKey, EndKey, ReduceIdxPrefix),
+    io:format("SCAN startkey ~p nearest ~p ~n", [StartKey, NearestKey]),
+    case StartKey =:= NearestKey of
+        true ->
+            GroupLevelEndKey = get_group_level_endkey(TxDb, GroupLevel,
+                NextLevel, StartKey, ReduceIdxPrefix),
+
+            ToFar = GroupLevelEndKey > EndKey,
+            EndOfLevel = GroupLevelEndKey == NearestKey,
+
+            case ToFar orelse EndOfLevel of
+                true ->
+                    Acc;
+                false ->
+                    Acc1 = Acc ++ [{NextLevel, StartKey, GroupLevelEndKey}],
+                    scan_for_level_ranges(TxDb, NextLevel, GroupLevel, StartKey,
+                        EndKey, ReduceIdxPrefix, Acc1)
+            end;
+        false ->
+            case couch_views_reducer:group_level_equal(StartKey, NearestKey,
+                GroupLevel) of
+                true ->
+                    [{Level, StartKey, NearestKey}];
+                false ->
+                    Acc
+            end
+    end.
+
+
+get_key_or_nearest(TxDb, Level, StartKey, EndKey, ReduceIdxPrefix) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+    StartKey2 = erlfdb_key:first_greater_or_equal(StartKey1),
+
+    EndKey1 = create_endkey(ReduceIdxPrefix, Level, EndKey),
+    EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+    Future = erlfdb:get_range(Tx, StartKey2, EndKey2, [{limit, 1}]),
+    wait_and_get_key(Future).
+
+
+get_group_level_endkey(TxDb, GroupLevel, Level, StartKey, ReduceIdxPrefix) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    GroupLevelKey = couch_views_reducer:group_level_key(StartKey, GroupLevel),
+    StartKey1 = create_key(ReduceIdxPrefix, Level, GroupLevelKey),
+    StartKey2 = erlfdb_key:first_greater_than(StartKey1),
+    EndKey = create_endkey(ReduceIdxPrefix, Level, GroupLevelKey),
+    EndKey1 = erlfdb_key:first_greater_or_equal(EndKey),
+    Future = erlfdb:get_range(Tx, StartKey2, EndKey1, [{reverse, true}, {limit, 1}]),
+    wait_and_get_key(Future).
+
+
+get_key_after(TxDb, StartKey, EndKey, Level, ReduceIdxPrefix) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+    StartKey2 = erlfdb_key:first_greater_than(StartKey1),
+
+    EndKey1 = create_endkey(ReduceIdxPrefix, Level, EndKey),
+    EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+    Future = erlfdb:get_range(Tx, StartKey2, EndKey2, [{limit, 1}]),
+    wait_and_get_key(Future).
+
+
+wait_and_get_key(Future) ->
+    case erlfdb:wait(Future) of
+        [] ->
+            null;
+        [{_FullEncodedKey, PackedValue}] ->
+            {Key, _} = get_key_value(PackedValue),
+            Key
+    end.
+
+
+get_range_inclusive(TxDb, StartKey, EndKey, Level, ReduceIdxPrefix) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    StartKey1 = create_key(ReduceIdxPrefix, Level, StartKey),
+    StartKey2 = erlfdb_key:first_greater_or_equal(StartKey1),
+
+    EndKey1 = create_key(ReduceIdxPrefix, Level, EndKey),
+    EndKey2 = erlfdb_key:first_greater_than(EndKey1),
+
+    Fun = fun ({_FullEncodedKey, PackedValue}, Acc0) ->
+        KV = get_key_value(PackedValue),
+        Acc0 ++ [KV]
+    end,
+
+    erlfdb:fold_range(Tx, StartKey2, EndKey2, Fun, [], []).
+
+
+% TODO: This needs a better name
+create_endkey(ReduceIdxPrefix, Level, Key) ->
+    Key1 = if Key /= null -> Key; true -> [] end,
+    EK = couch_views_encoding:encode(Key1 ++ [16#FF], key),
+    LevelKey = {Level, EK},
+    erlfdb_tuple:pack(LevelKey, ReduceIdxPrefix).
+
+
 fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0) ->
     #{
         db_prefix := DbPrefix
     } = Db,
 
-    Level = 0,
     ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     Acc = #{
         sig => Sig,
         view_id => ViewId,
         user_acc => UserAcc0,
-        %%            args := Args,
         callback => UserCallback,
         reduce_idx_prefix => ReduceIdxPrefix,
         reducer => Reducer,
@@ -88,16 +310,15 @@ fold_level0(Db, Sig, ViewId, Reducer, GroupLevel, Opts, UserCallback, UserAcc0)
         rows => []
     },
 
+    {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
+    {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
+
     fabric2_fdb:transactional(Db, fun(TxDb) ->
         log_levels(TxDb, Sig, ViewId),
         #{
             tx := Tx
         } = TxDb,
 
-
-        {startkey, StartKey} = lists:keyfind(startkey, 1, Opts),
-        {endkey, EndKey} = lists:keyfind(endkey, 1, Opts),
-
         Fun = fun fold_fwd_cb/2,
         Acc1 = erlfdb:fold_range(Tx, StartKey, EndKey, Fun, Acc, Opts),
         #{
@@ -126,10 +347,10 @@ fold_fwd_cb({_FullEncodedKey, EV}, Acc) ->
         LastKey0
     end,
 
-    GroupLevelKey = group_level_key(Key, GroupLevel),
+    GroupLevelKey = couch_views_reducer:group_level_key(Key, GroupLevel),
     GroupKV = [{GroupLevelKey, Val}],
 
-    case group_level_equal(Key, LastKey, GroupLevel) of
+    case couch_views_reducer:group_level_equal(Key, LastKey, GroupLevel) of
         true ->
             Acc#{
                 rows := Rows ++ GroupKV
@@ -147,58 +368,10 @@ rereduce_and_reply(_Reducer, [], _GroupLevel, _Callback, Acc) ->
     Acc;
 
 rereduce_and_reply(Reducer, Rows, GroupLevel, Callback, Acc) ->
-    {ReducedKey, ReducedVal} = rereduce(Reducer, Rows, GroupLevel),
+    {ReducedKey, ReducedVal} = couch_views_reducer:rereduce(Reducer, Rows, GroupLevel),
     Callback(ReducedKey, ReducedVal, Acc).
 
 
-rereduce(_Reducer, [], _GroupLevel) ->
-    no_kvs;
-
-rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
-    {Key, Val} = hd(Rows),
-    GroupKey = group_level_key(Key, GroupLevel),
-    {GroupKey, Val};
-
-rereduce(<<"_sum">>, Rows, GroupLevel) ->
-    Sum = lists:foldl(fun ({_, Val}, Acc) ->
-       Val + Acc
-    end, 0, Rows),
-    {Key, _} = hd(Rows),
-    GroupKey = group_level_key(Key, GroupLevel),
-    {GroupKey, Sum};
-
-rereduce(<<"_count">>, Rows, GroupLevel) ->
-    Val = length(Rows),
-    {Key, _} = hd(Rows),
-    GroupKey = group_level_key(Key, GroupLevel),
-    {GroupKey, Val}.
-
-
-group_level_equal(_One, _Two, 0) ->
-    true;
-
-group_level_equal(_One, _Two, group_true) ->
-    false;
-
-group_level_equal(One, Two, GroupLevel) ->
-    GroupOne = group_level_key(One, GroupLevel),
-    GroupTwo = group_level_key(Two, GroupLevel),
-    GroupOne == GroupTwo.
-
-
-group_level_key(_Key, 0) ->
-    null;
-
-group_level_key(Key, group_true) ->
-    Key;
-
-group_level_key(Key, GroupLevel) when is_list(Key) ->
-    lists:sublist(Key, GroupLevel);
-
-group_level_key(Key, _GroupLevel) ->
-    Key.
-
-
 reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId) ->
     Key = {?DB_VIEWS, Sig, ?VIEW_REDUCE_RANGE, ViewId},
     erlfdb_tuple:pack(Key, DbPrefix).
@@ -211,23 +384,21 @@ unpack_key_value(EncodedValue) ->
     {Key, Val}.
 
 
-%% Inserting
-update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
+create_reduce_indexes(Db, Sig, ViewIds) ->
     #{
         db_prefix := DbPrefix
-    } = TxDb,
-
-    ViewOpts = #{
-        db_prefix => DbPrefix,
-        sig => Sig,
-        view_id => ViewId,
-        reducer => Reducer
-    },
+    } = Db,
 
-    lists:foreach(fun ({Key, Val}) ->
-        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
-        add_kv_to_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
-    end, ReduceResult).
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:foreach(fun (ViewId) ->
+            ViewOpts = #{
+                db_prefix => DbPrefix,
+                sig => Sig,
+                view_id => ViewId
+            },
+            create_skip_list(TxDb, ?MAX_SKIP_LIST_LEVELS, ViewOpts)
+        end, ViewIds)
+    end).
 
 
 create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
@@ -248,16 +419,33 @@ create_skip_list(Db, MaxLevel, #{} = ViewOpts) ->
     end).
 
 
-add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
+%% Inserting
+update_reduce_idx(TxDb, Sig, ViewId, Reducer, _DocId, _ExistingKeys, ReduceResult) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    ViewOpts = #{
+        db_prefix => DbPrefix,
+        sig => Sig,
+        view_id => ViewId,
+        reducer => Reducer
+    },
+
+    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
+
+    lists:foreach(fun ({Key, Val}) ->
+        io:format("RESULTS KV ~p ~p ~n", [Key, Val]),
+        add_kv_to_skip_list(TxDb, ReduceIdxPrefix, ?MAX_SKIP_LIST_LEVELS, ViewOpts, Key, Val)
+    end, ReduceResult).
+
+
+add_kv_to_skip_list(Db, ReduceIdxPrefix, MaxLevel, #{} = ViewOpts, Key, Val) ->
     #{
-        db_prefix := DbPrefix,
-        sig := Sig,
-        view_id := ViewId,
         reducer := Reducer
     } = ViewOpts,
 
     Levels = lists:seq(1, MaxLevel),
-    ReduceIdxPrefix = reduce_skip_list_idx_prefix(DbPrefix, Sig, ViewId),
     KeyHash = hash_key(Key),
 
     fabric2_fdb:transactional(Db, fun(TxDb) ->
@@ -265,7 +453,7 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
             not_found ->
                 Val;
             ExistingVal ->
-                {_, ReducedVal} = rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
+                {_, ReducedVal} = couch_views_reducer:rereduce(Reducer, [{Key, ExistingVal}, {Key, Val}], group_true),
                 ReducedVal
         end,
         io:format("VAL1 ~p ~n", [Val1]),
@@ -279,9 +467,10 @@ add_kv_to_skip_list(Db, MaxLevel, #{} = ViewOpts, Key, Val) ->
                     io:format("Adding at ~p ~p ~n", [Level, Key]),
                     add_kv(TxDb, ReduceIdxPrefix, Level, Key, Val);
                 false ->
+                    {_, NewVal} = couch_views_reducer:rereduce(Reducer, [{PrevKey, PrevVal}, {Key, Val}], 0),
 %%                    {PrevKey, NewVal} = rereduce(<<"_stats">>, {PrevKey, PrevVal}, {Key, Val}),
-%%                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
-                    add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, PrevVal)
+                    io:format("RE_REDUCE ~p ~p ~p ~p ~n", [Level, Key, PrevKey, NewVal]),
+                    add_kv(TxDb, ReduceIdxPrefix, Level, PrevKey, NewVal)
             end
         end, Levels)
     end).
diff --git a/src/couch_views/src/couch_views_reducer.erl b/src/couch_views/src/couch_views_reducer.erl
new file mode 100644
index 0000000..a7ac783
--- /dev/null
+++ b/src/couch_views/src/couch_views_reducer.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_views_reducer).
+
+
+-export([
+    reduce/2,
+    rereduce/3,
+    group_level_equal/3,
+    group_level_key/2
+]).
+
+
+reduce(<<"_count">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, _}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Val} = Acc,
+                Acc#{Key := Val + 1};
+            false ->
+                Acc#{Key => 1}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
+reduce(<<"_sum">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Sum} = Acc,
+                Acc#{Key := Val + Sum};
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults);
+
+
+% this isn't a real supported reduce function in CouchDB
+% But I want a basic reduce function that when we need to update the index
+% we would need to re-read multiple rows instead of being able to do an
+% atomic update
+reduce(<<"_stats">>, Results) ->
+    ReduceResults = lists:foldl(fun ({Key, Val}, Acc) ->
+        io:format("MAX ~p ~p ~n", [Key, Val]),
+        case maps:is_key(Key, Acc) of
+            true ->
+                #{Key := Max} = Acc,
+                case Max >= Val of
+                    true ->
+                        Acc;
+                    false ->
+                        Acc#{Key := Val}
+                end;
+            false ->
+                Acc#{Key => Val}
+        end
+    end, #{}, Results),
+    maps:to_list(ReduceResults).
+
+
+rereduce(_Reducer, [], _GroupLevel) ->
+    no_kvs;
+
+rereduce(_Reducer, Rows, GroupLevel) when length(Rows) == 1 ->
+    {Key, Val} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Val};
+
+rereduce(<<"_sum">>, Rows, GroupLevel) ->
+    Sum = lists:foldl(fun ({_, Val}, Acc) ->
+        Val + Acc
+    end, 0, Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Sum};
+
+rereduce(<<"_count">>, Rows, GroupLevel) ->
+    Val = length(Rows),
+    {Key, _} = hd(Rows),
+    GroupKey = group_level_key(Key, GroupLevel),
+    {GroupKey, Val}.
+
+
+group_level_equal(_One, _Two, 0) ->
+    true;
+
+group_level_equal(_One, _Two, group_true) ->
+    false;
+
+group_level_equal(One, Two, GroupLevel) ->
+    GroupOne = group_level_key(One, GroupLevel),
+    GroupTwo = group_level_key(Two, GroupLevel),
+    GroupOne == GroupTwo.
+
+
+group_level_key(_Key, 0) ->
+    null;
+
+group_level_key(Key, group_true) ->
+    Key;
+
+group_level_key(Key, GroupLevel) when is_list(Key) ->
+    lists:sublist(Key, GroupLevel);
+
+group_level_key(Key, _GroupLevel) ->
+    Key.
+
diff --git a/src/couch_views/test/exunit/couch_views_reduce_test.exs b/src/couch_views/test/exunit/couch_views_reduce_test.exs
index d6dcc60..e2b9d3b 100644
--- a/src/couch_views/test/exunit/couch_views_reduce_test.exs
+++ b/src/couch_views/test/exunit/couch_views_reduce_test.exs
@@ -46,7 +46,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "dates")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: [2017, 3, 1], value: 1]},
@@ -62,7 +61,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "dates_count")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: [2017], value: 4]},
@@ -74,11 +72,11 @@ defmodule CouchViewsReduceTest do
   test "group_level=1 reduce", context do
     args = %{
       reduce: true,
-      group_level: 1
+      group_level: 1,
+      end_key: [2019,5,1]
     }
 
     {:ok, res} = run_query(context, args, "dates_sum")
-    IO.inspect(res, label: "OUT")
 
     assert res == [
              {:row, [key: [2017], value: 31]},
@@ -97,7 +95,6 @@ defmodule CouchViewsReduceTest do
       }
 
       {:ok, res} = run_query(context, args, "dates_sum")
-      IO.inspect(res, label: "OUT")
 
       assert res == [
                  {:row, [key: [2017], value: 22]},
@@ -114,7 +111,6 @@ defmodule CouchViewsReduceTest do
       }
 
       {:ok, res} = run_query(context, args, "dates_sum")
-      IO.inspect(res, label: "OUT")
 
       assert res == [
                  {:row, [key: [2017], value: 22]},
@@ -132,7 +128,6 @@ defmodule CouchViewsReduceTest do
       }
 
       {:ok, res} = run_query(context, args, "dates_sum")
-      IO.inspect(res, label: "OUT")
 
       assert res == [
                  {:row, [key: [2017], value: 22]},
@@ -141,23 +136,22 @@ defmodule CouchViewsReduceTest do
              ]
   end
 
-  test "group=true reduce with startkey/endkey", context do
-      args = %{
-          reduce: true,
-          group: true,
-          start_key: [2018, 5, 1],
-          end_key: [2019, 04, 1],
-      }
-
-      {:ok, res} = run_query(context, args, "dates_sum")
-      IO.inspect(res, label: "OUT")
-
-      assert res == [
-                 {:row, [key: [2018, 5, 1], value: 7]},
-                 {:row, [key: [2019, 3, 1], value: 4]},
-                 {:row, [key: [2019, 4, 1], value: 6]}
-             ]
-  end
+#  test "group=true reduce with startkey/endkey", context do
+#      args = %{
+#          reduce: true,
+#          group: true,
+#          start_key: [2018, 5, 1],
+#          end_key: [2019, 04, 1],
+#      }
+#
+#      {:ok, res} = run_query(context, args, "dates_sum")
+#
+#      assert res == [
+#                 {:row, [key: [2018, 5, 1], value: 7]},
+#                 {:row, [key: [2019, 3, 1], value: 4]},
+#                 {:row, [key: [2019, 4, 1], value: 6]}
+#             ]
+#  end
 
   #  test "group=1 count reduce", context do
   #    args = %{
@@ -167,7 +161,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "baz")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: 1, value: 2]},
@@ -187,7 +180,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "baz")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: 1, value: 2]},
@@ -210,7 +202,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "baz")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: 1, value: 2]},
@@ -236,7 +227,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "boom")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: [2019, 1], value: 1]},
@@ -252,7 +242,6 @@ defmodule CouchViewsReduceTest do
   #    }
   #
   #    {:ok, res} = run_query(context, args, "max")
-  #    IO.inspect(res, label: "OUT")
   #
   #    assert res == [
   #             {:row, [key: :null, value: 3]}
@@ -267,7 +256,7 @@ defmodule CouchViewsReduceTest do
   end
 
   def default_cb(:complete, acc) do
-    IO.inspect(acc, label: "complete")
+    IO.inspect(Enum.reverse(acc), label: "complete")
     {:ok, Enum.reverse(acc)}
   end