You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2020/04/23 17:24:55 UTC

[couchdb] 02/02: Add fold_docs for DocId list

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5efcbfc3ca114696273f56b1c98876c95e63bda7
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Apr 6 12:04:56 2020 +0200

    Add fold_docs for DocId list
    
    Adds a fold_docs function that will do a parallel fetch for the supplied
    Doc Ids. This is used for _all_docs?keys=["id1", "id2"].
    This uses a queue for fetching the revs and another queue for fetching
    the doc bodies. These queues will be drained if the future queue gets
    to large.
---
 src/chttpd/src/chttpd_db.erl       |  14 ++--
 src/fabric/src/fabric2_db.erl      | 142 +++++++++++++++++++++++++++++++++++++
 test/elixir/test/all_docs_test.exs |  30 +++++++-
 3 files changed, 177 insertions(+), 9 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 8cfcfec..0780095 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -902,9 +902,10 @@ send_all_docs_keys(Db, #mrargs{} = Args, VAcc0) ->
         _ -> Args#mrargs.doc_options
     end,
     IncludeDocs = Args#mrargs.include_docs,
-    lists:foldl(fun(DocId, Acc) ->
-        OpenOpts = [deleted | DocOpts],
-        Row0 = case fabric2_db:open_doc(Db, DocId, OpenOpts) of
+    OpenOpts = [deleted | DocOpts],
+
+    CB = fun(DocId, Doc, Acc) ->
+        Row0 = case Doc of
             {not_found, missing} ->
                 #view_row{key = DocId};
             {ok, #doc{deleted = true, revs = Revs}} ->
@@ -938,9 +939,10 @@ send_all_docs_keys(Db, #mrargs{} = Args, VAcc0) ->
                 }
         end,
         Row1 = fabric_view:transform_row(Row0),
-        {ok, NewAcc} = view_cb(Row1, Acc),
-        NewAcc
-    end, VAcc1, Keys).
+        view_cb(Row1, Acc)
+    end,
+    {ok, VAcc2} = fabric2_db:fold_docs(Db, Keys, CB, VAcc1, OpenOpts),
+    VAcc2.
 
 
 apply_args_to_keylist(Args, Keys0) ->
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 15694cd..740f9ab 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -110,6 +110,7 @@
 
     fold_docs/3,
     fold_docs/4,
+    fold_docs/5,
     fold_design_docs/4,
     fold_local_docs/4,
     fold_changes/4,
@@ -969,6 +970,61 @@ fold_docs(Db, UserFun, UserAcc0, Options) ->
     end).
 
 
+fold_docs(Db, DocIds, UserFun, UserAcc0, Options) ->
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        try
+            NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
+            NeedsTree = (Options -- NeedsTreeOpts /= Options),
+
+            FetchRevs = case NeedsTree of
+                true ->
+                    fun(DocId) ->
+                        fabric2_fdb:get_all_revs_future(TxDb, DocId)
+                    end;
+                false ->
+                    fun(DocId) ->
+                        fabric2_fdb:get_winning_revs_future(TxDb, DocId, 1)
+                    end
+            end,
+            InitAcc = #{
+                revs_q => queue:new(),
+                revs_count => 0,
+                body_q => queue:new(),
+                body_count => 0,
+                doc_opts => Options,
+                user_acc => UserAcc0,
+                user_fun => UserFun
+            },
+
+            FinalAcc1 = lists:foldl(fun(DocId, Acc) ->
+                #{
+                    revs_q := RevsQ,
+                    revs_count := RevsCount
+                } = Acc,
+                Future = FetchRevs(DocId),
+                NewAcc = Acc#{
+                    revs_q := queue:in({DocId, Future}, RevsQ),
+                    revs_count := RevsCount + 1
+                },
+                drain_fold_docs_revs_futures(TxDb, NewAcc)
+            end, InitAcc, DocIds),
+
+            FinalAcc2 = drain_all_fold_docs_revs_futures(TxDb, FinalAcc1),
+            FinalAcc3 = drain_all_fold_docs_body_futures(TxDb, FinalAcc2),
+
+            #{
+                user_acc := FinalUserAcc
+            } = FinalAcc3,
+            {ok, FinalUserAcc}
+
+        catch throw:{stop, StopUserAcc} ->
+            {ok, StopUserAcc}
+        end
+    end).
+
+
+
+
 fold_design_docs(Db, UserFun, UserAcc0, Options1) ->
     Options2 = set_design_doc_keys(Options1),
     fold_docs(Db, UserFun, UserAcc0, Options2).
@@ -1206,6 +1262,92 @@ drain_all_deleted_info_futures(FutureQ, UserFun, Acc) ->
     end.
 
 
+drain_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C < 100 ->
+    Acc;
+drain_fold_docs_revs_futures(TxDb, Acc) ->
+    drain_one_fold_docs_revs_future(TxDb, Acc).
+
+
+drain_all_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C =< 0 ->
+    Acc;
+drain_all_fold_docs_revs_futures(TxDb, #{revs_count := C} = Acc) when C > 0 ->
+    NewAcc = drain_one_fold_docs_revs_future(TxDb, Acc),
+    drain_all_fold_docs_revs_futures(TxDb, NewAcc).
+
+
+drain_one_fold_docs_revs_future(TxDb, Acc) ->
+    #{
+        revs_q := RevsQ,
+        revs_count := RevsCount,
+        body_q := BodyQ,
+        body_count := BodyCount
+    } = Acc,
+    {{value, {DocId, RevsFuture}}, RestRevsQ} = queue:out(RevsQ),
+
+    Revs = fabric2_fdb:get_revs_wait(TxDb, RevsFuture),
+    DocFuture = case Revs of
+        [] ->
+            {DocId, [], not_found};
+        [_ | _] ->
+            Winner = get_rev_winner(Revs),
+            BodyFuture = fabric2_fdb:get_doc_body_future(TxDb, DocId, Winner),
+            {DocId, Revs, BodyFuture}
+    end,
+    NewAcc = Acc#{
+        revs_q := RestRevsQ,
+        revs_count := RevsCount - 1,
+        body_q := queue:in(DocFuture, BodyQ),
+        body_count := BodyCount + 1
+    },
+    drain_fold_docs_body_futures(TxDb, NewAcc).
+
+
+drain_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C < 100 ->
+    Acc;
+drain_fold_docs_body_futures(TxDb, Acc) ->
+    drain_one_fold_docs_body_future(TxDb, Acc).
+
+
+drain_all_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C =< 0 ->
+    Acc;
+drain_all_fold_docs_body_futures(TxDb, #{body_count := C} = Acc) when C > 0 ->
+    NewAcc = drain_one_fold_docs_body_future(TxDb, Acc),
+    drain_all_fold_docs_body_futures(TxDb, NewAcc).
+
+
+drain_one_fold_docs_body_future(TxDb, Acc) ->
+    #{
+        body_q := BodyQ,
+        body_count := BodyCount,
+        doc_opts := DocOpts,
+        user_fun := UserFun,
+        user_acc := UserAcc
+    } = Acc,
+    {{value, {DocId, Revs, BodyFuture}}, RestBodyQ} = queue:out(BodyQ),
+    Doc = case BodyFuture of
+        not_found ->
+            {not_found, missing};
+        _ ->
+            RevInfo = get_rev_winner(Revs),
+            Base = fabric2_fdb:get_doc_body_wait(TxDb, DocId, RevInfo,
+                BodyFuture),
+            apply_open_doc_opts(Base, Revs, DocOpts)
+    end,
+    NewUserAcc = maybe_stop(UserFun(DocId, Doc, UserAcc)),
+    Acc#{
+        body_q := RestBodyQ,
+        body_count := BodyCount - 1,
+        user_acc := NewUserAcc
+    }.
+
+
+get_rev_winner(Revs) ->
+    [Winner] = lists:filter(fun(Rev) ->
+        maps:get(winner, Rev)
+    end, Revs),
+    Winner.
+
+
 new_revid(Db, Doc) ->
     #doc{
         id = DocId,
diff --git a/test/elixir/test/all_docs_test.exs b/test/elixir/test/all_docs_test.exs
index 16641aa..d41d046 100644
--- a/test/elixir/test/all_docs_test.exs
+++ b/test/elixir/test/all_docs_test.exs
@@ -233,6 +233,26 @@ defmodule AllDocsTest do
   end
 
   @tag :with_db
+  test "POST with missing keys", context do
+    db_name = context[:db_name]
+
+    resp = Couch.post("/#{db_name}/_bulk_docs", body: %{docs: create_docs(0..3)})
+    assert resp.status_code in [201, 202]
+
+    resp = Couch.post(
+      "/#{db_name}/_all_docs",
+      body: %{
+        :keys => [1]
+      }
+    )
+
+    assert resp.status_code == 200
+    rows = resp.body["rows"]
+    assert length(rows) == 1
+    assert hd(rows) == %{"error" => "not_found", "key" => 1}
+  end
+
+  @tag :with_db
   test "POST with keys and limit", context do
     db_name = context[:db_name]
 
@@ -242,13 +262,17 @@ defmodule AllDocsTest do
     resp = Couch.post(
       "/#{db_name}/_all_docs",
       body: %{
-        :keys => [1, 2],
-        :limit => 1
+        :keys => ["1", "2"],
+        :limit => 1,
+        :include_docs => true
       }
     )
 
     assert resp.status_code == 200
-    assert length(Map.get(resp, :body)["rows"]) == 1
+    rows = resp.body["rows"]
+    assert length(rows) == 1
+    doc = hd(rows)["doc"]
+    assert doc["string"] == "1"
   end
 
   @tag :with_db