You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2020/02/19 08:22:34 UTC
[couchdb] 22/23: add multi transaction iterators and resuse
couch_views_server
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch fdb-mango-indexes
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 8028f371657841b58df29b888c9cd637348c8763
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Feb 17 16:32:02 2020 +0200
add multi transaction iterators and resuse couch_views_server
---
src/couch_views/src/couch_views_server.erl | 18 +++--
src/couch_views/src/couch_views_sup.erl | 3 +-
src/fabric/src/fabric2_fdb.erl | 3 +-
src/mango/src/mango_fdb.erl | 5 +-
src/mango/src/mango_indexer_server.erl | 103 -----------------------------
src/mango/src/mango_jobs_indexer.erl | 3 +-
src/mango/src/mango_sup.erl | 3 +-
src/mango/test/20-no-timeout-test.py | 16 +++--
8 files changed, 33 insertions(+), 121 deletions(-)
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index d14216e..6299407 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -17,7 +17,7 @@
-export([
- start_link/0
+ start_link/1
]).
@@ -34,16 +34,18 @@
-define(MAX_WORKERS, 100).
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(Opts) ->
+ gen_server:start_link(?MODULE, Opts, []).
-init(_) ->
+init(Opts) ->
+ WorkerModule = couch_util:get_value(worker, Opts, couch_views_indexer),
process_flag(trap_exit, true),
couch_views_jobs:set_timeout(),
St = #{
workers => #{},
- max_workers => max_workers()
+ max_workers => max_workers(),
+ worker_module => WorkerModule
},
{ok, spawn_workers(St)}.
@@ -87,11 +89,13 @@ code_change(_OldVsn, St, _Extra) ->
spawn_workers(St) ->
#{
workers := Workers,
- max_workers := MaxWorkers
+ max_workers := MaxWorkers,
+ worker_module := WorkerModule
} = St,
+ io:format("BOOM COUCH VIEWS SERVER ~p ~n", [WorkerModule]),
case maps:size(Workers) < MaxWorkers of
true ->
- Pid = couch_views_indexer:spawn_link(),
+ Pid = WorkerModule:spawn_link(),
NewSt = St#{workers := Workers#{Pid => true}},
spawn_workers(NewSt);
false ->
diff --git a/src/couch_views/src/couch_views_sup.erl b/src/couch_views/src/couch_views_sup.erl
index 7a72a1f..c3256fd 100644
--- a/src/couch_views/src/couch_views_sup.erl
+++ b/src/couch_views/src/couch_views_sup.erl
@@ -36,10 +36,11 @@ start_link() ->
init(normal) ->
+ Args = [{worker, couch_views_indexer}],
Children = [
#{
id => couch_views_server,
- start => {couch_views_server, start_link, []}
+ start => {couch_views_server, start_link, [Args]}
}
],
{ok, {flags(), Children}};
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index e28d0b4..cf73244 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -811,7 +811,8 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
% Update database size
AddSize = sum_add_rev_sizes([NewWinner | ToUpdate]),
RemSize = sum_rem_rev_sizes(ToRemove),
- incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),
+%% TODO: causing mango indexes to fail with fdb error 1036
+%% incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),
ok.
diff --git a/src/mango/src/mango_fdb.erl b/src/mango/src/mango_fdb.erl
index 769fdc9..edbb27f 100644
--- a/src/mango/src/mango_fdb.erl
+++ b/src/mango/src/mango_fdb.erl
@@ -42,7 +42,7 @@ create_build_vs(TxDb, #idx{} = Idx) ->
Key = build_vs_key(TxDb, Idx#idx.ddoc),
VS = fabric2_fdb:new_versionstamp(Tx),
Value = erlfdb_tuple:pack_vs({VS, ?MANGO_INDEX_BUILDING}),
- erlfdb:set_versionstamped_value(Tx, Key, Value).
+ ok = erlfdb:set_versionstamped_value(Tx, Key, Value).
set_build_vs(TxDb, #idx{} = Idx, VS, State) ->
@@ -176,7 +176,8 @@ args_to_fdb_opts(Args, Idx) ->
[
{skip, Skip},
{dir, Direction},
- {streaming_mode, want_all}
+ {streaming_mode, want_all},
+ {restart_tx, true}
] ++ StartKeyOpts ++ EndKeyOpts.
diff --git a/src/mango/src/mango_indexer_server.erl b/src/mango/src/mango_indexer_server.erl
deleted file mode 100644
index 6942c9f..0000000
--- a/src/mango/src/mango_indexer_server.erl
+++ /dev/null
@@ -1,103 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(mango_indexer_server).
-
-
--behaviour(gen_server).
-
-
--export([
- start_link/0
-]).
-
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3
-]).
-
-
--define(MAX_WORKERS, 100).
-
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-init(_) ->
- process_flag(trap_exit, true),
- mango_jobs:set_timeout(),
- St = #{
- workers => #{},
- max_workers => max_workers()
- },
- {ok, spawn_workers(St)}.
-
-
-terminate(_, _St) ->
- ok.
-
-
-handle_call(Msg, _From, St) ->
- {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
-
-
-handle_cast(Msg, St) ->
- {stop, {bad_cast, Msg}, St}.
-
-
-handle_info({'EXIT', Pid, Reason}, St) ->
- #{workers := Workers} = St,
- case maps:is_key(Pid, Workers) of
- true ->
- if Reason == normal -> ok; true ->
- LogMsg = "~p : indexer process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason])
- end,
- NewWorkers = maps:remove(Pid, Workers),
- {noreply, spawn_workers(St#{workers := NewWorkers})};
- false ->
- LogMsg = "~p : unknown process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
- {stop, {unknown_pid_exit, Pid}, St}
- end;
-
-handle_info(Msg, St) ->
- {stop, {bad_info, Msg}, St}.
-
-
-code_change(_OldVsn, St, _Extra) ->
- {ok, St}.
-
-
-spawn_workers(St) ->
- #{
- workers := Workers,
- max_workers := MaxWorkers
- } = St,
- case maps:size(Workers) < MaxWorkers of
- true ->
- Pid = mango_jobs_indexer:spawn_link(),
- NewSt = St#{workers := Workers#{Pid => true}},
- spawn_workers(NewSt);
- false ->
- St
- end.
-
-
-max_workers() ->
- config:get_integer("mango", "max_workers", ?MAX_WORKERS).
diff --git a/src/mango/src/mango_jobs_indexer.erl b/src/mango/src/mango_jobs_indexer.erl
index d68c80b..c22b62f 100644
--- a/src/mango/src/mango_jobs_indexer.erl
+++ b/src/mango/src/mango_jobs_indexer.erl
@@ -214,7 +214,8 @@ fold_changes(State) ->
} = State,
Fun = fun process_changes/2,
- fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+ Opts = [{limit, Limit}, {restart_tx, false}],
+ fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, Opts).
process_changes(Change, Acc) ->
diff --git a/src/mango/src/mango_sup.erl b/src/mango/src/mango_sup.erl
index fc12dfe..d702d09 100644
--- a/src/mango/src/mango_sup.erl
+++ b/src/mango/src/mango_sup.erl
@@ -27,10 +27,11 @@ init([]) ->
period => 10
},
+ Args = [{worker, mango_jobs_indexer}],
Children = [
#{
id => mango_indexer_server,
- start => {mango_indexer_server, start_link, []}
+ start => {couch_views_server, start_link, [Args]}
}
] ++ couch_epi:register_service(mango_epi, []),
{ok, {Flags, Children}}.
diff --git a/src/mango/test/20-no-timeout-test.py b/src/mango/test/20-no-timeout-test.py
index 900e73e..b54e81c 100644
--- a/src/mango/test/20-no-timeout-test.py
+++ b/src/mango/test/20-no-timeout-test.py
@@ -15,19 +15,25 @@ import copy
import unittest
-@unittest.skip("re-enable with multi-transaction iterators")
class LongRunningMangoTest(mango.DbPerClass):
def setUp(self):
self.db.recreate()
+ self.db.create_index(["value"])
docs = []
for i in range(100000):
- docs.append({"_id": str(i), "another": "field"})
- if i % 20000 == 0:
+ docs.append({"_id": str(i), "another": "field", "value": i})
+ if i % 1000 == 0:
self.db.save_docs(docs)
docs = []
# This test should run to completion and not timeout
def test_query_does_not_time_out(self):
- selector = {"_id": {"$gt": 0}, "another": "wrong"}
- docs = self.db.find(selector)
+ # using _all_docs
+ selector1 = {"_id": {"$gt": 0}, "another": "wrong"}
+ docs = self.db.find(selector1)
+ self.assertEqual(len(docs), 0)
+
+ # using index
+ selector2 = {"value": {"$gt": 0}, "another": "wrong"}
+ docs = self.db.find(selector2)
self.assertEqual(len(docs), 0)