You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2020/02/19 08:22:34 UTC

[couchdb] 22/23: add multi transaction iterators and resuse couch_views_server

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch fdb-mango-indexes
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8028f371657841b58df29b888c9cd637348c8763
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Feb 17 16:32:02 2020 +0200

    add multi transaction iterators and resuse couch_views_server
---
 src/couch_views/src/couch_views_server.erl |  18 +++--
 src/couch_views/src/couch_views_sup.erl    |   3 +-
 src/fabric/src/fabric2_fdb.erl             |   3 +-
 src/mango/src/mango_fdb.erl                |   5 +-
 src/mango/src/mango_indexer_server.erl     | 103 -----------------------------
 src/mango/src/mango_jobs_indexer.erl       |   3 +-
 src/mango/src/mango_sup.erl                |   3 +-
 src/mango/test/20-no-timeout-test.py       |  16 +++--
 8 files changed, 33 insertions(+), 121 deletions(-)

diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index d14216e..6299407 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -17,7 +17,7 @@
 
 
 -export([
-    start_link/0
+    start_link/1
 ]).
 
 
@@ -34,16 +34,18 @@
 -define(MAX_WORKERS, 100).
 
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(Opts) ->
+    gen_server:start_link(?MODULE, Opts, []).
 
 
-init(_) ->
+init(Opts) ->
+    WorkerModule = couch_util:get_value(worker, Opts, couch_views_indexer),
     process_flag(trap_exit, true),
     couch_views_jobs:set_timeout(),
     St = #{
         workers => #{},
-        max_workers => max_workers()
+        max_workers => max_workers(),
+        worker_module => WorkerModule
     },
     {ok, spawn_workers(St)}.
 
@@ -87,11 +89,13 @@ code_change(_OldVsn, St, _Extra) ->
 spawn_workers(St) ->
     #{
         workers := Workers,
-        max_workers := MaxWorkers
+        max_workers := MaxWorkers,
+        worker_module := WorkerModule
     } = St,
+    io:format("BOOM COUCH VIEWS SERVER ~p ~n", [WorkerModule]),
     case maps:size(Workers) < MaxWorkers of
         true ->
-            Pid = couch_views_indexer:spawn_link(),
+            Pid = WorkerModule:spawn_link(),
             NewSt = St#{workers := Workers#{Pid => true}},
             spawn_workers(NewSt);
         false ->
diff --git a/src/couch_views/src/couch_views_sup.erl b/src/couch_views/src/couch_views_sup.erl
index 7a72a1f..c3256fd 100644
--- a/src/couch_views/src/couch_views_sup.erl
+++ b/src/couch_views/src/couch_views_sup.erl
@@ -36,10 +36,11 @@ start_link() ->
 
 
 init(normal) ->
+    Args = [{worker, couch_views_indexer}],
     Children = [
         #{
             id => couch_views_server,
-            start => {couch_views_server, start_link, []}
+            start => {couch_views_server, start_link, [Args]}
         }
     ],
     {ok, {flags(), Children}};
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index e28d0b4..cf73244 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -811,7 +811,8 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
     % Update database size
     AddSize = sum_add_rev_sizes([NewWinner | ToUpdate]),
     RemSize = sum_rem_rev_sizes(ToRemove),
-    incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),
+%%    TODO: causing mango indexes to fail with fdb error 1036
+%%    incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),
 
     ok.
 
diff --git a/src/mango/src/mango_fdb.erl b/src/mango/src/mango_fdb.erl
index 769fdc9..edbb27f 100644
--- a/src/mango/src/mango_fdb.erl
+++ b/src/mango/src/mango_fdb.erl
@@ -42,7 +42,7 @@ create_build_vs(TxDb, #idx{} = Idx) ->
     Key = build_vs_key(TxDb, Idx#idx.ddoc),
     VS = fabric2_fdb:new_versionstamp(Tx),
     Value = erlfdb_tuple:pack_vs({VS, ?MANGO_INDEX_BUILDING}),
-    erlfdb:set_versionstamped_value(Tx, Key, Value).
+    ok = erlfdb:set_versionstamped_value(Tx, Key, Value).
 
 
 set_build_vs(TxDb, #idx{} = Idx, VS, State) ->
@@ -176,7 +176,8 @@ args_to_fdb_opts(Args, Idx) ->
     [
         {skip, Skip},
         {dir, Direction},
-        {streaming_mode, want_all}
+        {streaming_mode, want_all},
+        {restart_tx, true}
     ] ++ StartKeyOpts ++ EndKeyOpts.
 
 
diff --git a/src/mango/src/mango_indexer_server.erl b/src/mango/src/mango_indexer_server.erl
deleted file mode 100644
index 6942c9f..0000000
--- a/src/mango/src/mango_indexer_server.erl
+++ /dev/null
@@ -1,103 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mango_indexer_server).
-
-
--behaviour(gen_server).
-
-
--export([
-    start_link/0
-]).
-
-
--export([
-    init/1,
-    terminate/2,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    code_change/3
-]).
-
-
--define(MAX_WORKERS, 100).
-
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-init(_) ->
-    process_flag(trap_exit, true),
-    mango_jobs:set_timeout(),
-    St = #{
-        workers => #{},
-        max_workers => max_workers()
-    },
-    {ok, spawn_workers(St)}.
-
-
-terminate(_, _St) ->
-    ok.
-
-
-handle_call(Msg, _From, St) ->
-    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
-
-
-handle_cast(Msg, St) ->
-    {stop, {bad_cast, Msg}, St}.
-
-
-handle_info({'EXIT', Pid, Reason}, St) ->
-    #{workers := Workers} = St,
-    case maps:is_key(Pid, Workers) of
-        true ->
-            if Reason == normal -> ok; true ->
-                LogMsg = "~p : indexer process ~p exited with ~p",
-                couch_log:error(LogMsg, [?MODULE, Pid, Reason])
-            end,
-            NewWorkers = maps:remove(Pid, Workers),
-            {noreply, spawn_workers(St#{workers := NewWorkers})};
-        false ->
-            LogMsg = "~p : unknown process ~p exited with ~p",
-            couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
-            {stop, {unknown_pid_exit, Pid}, St}
-    end;
-
-handle_info(Msg, St) ->
-    {stop, {bad_info, Msg}, St}.
-
-
-code_change(_OldVsn, St, _Extra) ->
-    {ok, St}.
-
-
-spawn_workers(St) ->
-    #{
-        workers := Workers,
-        max_workers := MaxWorkers
-    } = St,
-    case maps:size(Workers) < MaxWorkers of
-        true ->
-            Pid = mango_jobs_indexer:spawn_link(),
-            NewSt = St#{workers := Workers#{Pid => true}},
-            spawn_workers(NewSt);
-        false ->
-            St
-    end.
-
-
-max_workers() ->
-    config:get_integer("mango", "max_workers", ?MAX_WORKERS).
diff --git a/src/mango/src/mango_jobs_indexer.erl b/src/mango/src/mango_jobs_indexer.erl
index d68c80b..c22b62f 100644
--- a/src/mango/src/mango_jobs_indexer.erl
+++ b/src/mango/src/mango_jobs_indexer.erl
@@ -214,7 +214,8 @@ fold_changes(State) ->
     } = State,
 
     Fun = fun process_changes/2,
-    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+    Opts = [{limit, Limit}, {restart_tx, false}],
+    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, Opts).
 
 
 process_changes(Change, Acc) ->
diff --git a/src/mango/src/mango_sup.erl b/src/mango/src/mango_sup.erl
index fc12dfe..d702d09 100644
--- a/src/mango/src/mango_sup.erl
+++ b/src/mango/src/mango_sup.erl
@@ -27,10 +27,11 @@ init([]) ->
         period => 10
     },
 
+    Args = [{worker, mango_jobs_indexer}],
     Children = [
         #{
             id => mango_indexer_server,
-            start => {mango_indexer_server, start_link, []}
+            start => {couch_views_server, start_link, [Args]}
         }
     ] ++ couch_epi:register_service(mango_epi, []),
     {ok, {Flags, Children}}.
diff --git a/src/mango/test/20-no-timeout-test.py b/src/mango/test/20-no-timeout-test.py
index 900e73e..b54e81c 100644
--- a/src/mango/test/20-no-timeout-test.py
+++ b/src/mango/test/20-no-timeout-test.py
@@ -15,19 +15,25 @@ import copy
 import unittest
 
 
-@unittest.skip("re-enable with multi-transaction iterators")
 class LongRunningMangoTest(mango.DbPerClass):
     def setUp(self):
         self.db.recreate()
+        self.db.create_index(["value"])
         docs = []
         for i in range(100000):
-            docs.append({"_id": str(i), "another": "field"})
-            if i % 20000 == 0:
+            docs.append({"_id": str(i), "another": "field", "value": i})
+            if i % 1000 == 0:
                 self.db.save_docs(docs)
                 docs = []
 
     # This test should run to completion and not timeout
     def test_query_does_not_time_out(self):
-        selector = {"_id": {"$gt": 0}, "another": "wrong"}
-        docs = self.db.find(selector)
+        # using _all_docs
+        selector1 = {"_id": {"$gt": 0}, "another": "wrong"}
+        docs = self.db.find(selector1)
+        self.assertEqual(len(docs), 0)
+
+        # using index
+        selector2 = {"value": {"$gt": 0}, "another": "wrong"}
+        docs = self.db.find(selector2)
         self.assertEqual(len(docs), 0)