You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/09/03 20:03:57 UTC
[couchdb] 02/02: Optimize view indexer batch sizes
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/fdb-layer-replace-couch-rate
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5c477f33e4aad2454dc6fd2f2d66d52eea296844
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Sep 3 12:04:56 2020 -0500
Optimize view indexer batch sizes
The couch_views_batch module is responsible for sensing the largest
batch sizes that can be successfully processed for a given indexer
process. It works by initially searching for the maximum number of
documents that can be included in a batch. Once this threshold is found
it then works by slowly increasing the batch size and decreasing when
its found again.
This approach works to maximise batch sizes while being reactive to when
a larger batch would cross over the FoundationDB transaction limits
which causes the entire batch to be aborted and retried which wastes
time during view builds.
---
rel/overlay/etc/default.ini | 8 +
src/couch_views/src/couch_views_batch.erl | 192 ++++++++++++++++++++++
src/couch_views/src/couch_views_indexer.erl | 65 ++++++--
src/couch_views/test/couch_views_batch_test.erl | 82 +++++++++
src/couch_views/test/couch_views_indexer_test.erl | 2 +-
5 files changed, 332 insertions(+), 17 deletions(-)
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 6cdbed0..6056556 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -309,6 +309,14 @@ iterations = 10 ; iterations for password hashing
;
; The maximum allowed value size emitted from a view for a document (in bytes)
;value_size_limit = 64000
+;
+; Batch size sensing parameters
+; batch_initial_size = 100 ; Initial batch size in number of documents
+; batch_search_increment = 500 ; Size change when searching for the threshold
+; batch_sense_increment = 100 ; Size change increment after hitting a threshold
+; batch_max_tx_size = 9000000 ; Maximum transaction size in bytes
+; batch_max_tx_time = 4500 ; Maximum transaction time in milliseconds
+; batch_thresold_penalty = 0.2 ; Amount to reduce batch size when crossing a threshold
; CSP (Content Security Policy) Support for _utils
[csp]
diff --git a/src/couch_views/src/couch_views_batch.erl b/src/couch_views/src/couch_views_batch.erl
new file mode 100644
index 0000000..cfbe61c
--- /dev/null
+++ b/src/couch_views/src/couch_views_batch.erl
@@ -0,0 +1,192 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_batch).
+
+
+-export([
+ start/0,
+ success/2,
+ failure/0
+]).
+
+-export([
+ start/1,
+ success/3,
+ failure/1
+]).
+
+
+-callback start(State::term()) -> {NewState::term(), BatchSize::pos_integer()}.
+-callback success(
+ TxSize::non_neg_integer(),
+ DocsRead::non_neg_integer(),
+ State::term()
+ ) -> NewState::term().
+-callback failure(State::term()) -> NewState::term().
+
+
+-define(DEFAULT_MOD, atom_to_list(?MODULE)).
+
+
+-record(batch_st, {
+ start_time,
+ size,
+ state = search,
+ search_incr,
+ sense_incr,
+ max_tx_size,
+ max_tx_time,
+ threshold_penalty
+}).
+
+
+start() ->
+ {Mod, State} = case load_state() of
+ {M, S} ->
+ {M, S};
+ undefined ->
+ ModStr = config:get("couch_views", "batch_module", ?DEFAULT_MOD),
+ ModAtom = list_to_existing_atom(ModStr),
+ {ModAtom, undefined}
+ end,
+ {NewState, BatchSize} = Mod:start(State),
+ save_state(Mod, NewState),
+ BatchSize.
+
+
+success(TxSize, DocsRead) ->
+ {Mod, State} = load_state(),
+ NewState = Mod:success(TxSize, DocsRead, State),
+ save_state(Mod, NewState),
+ ok.
+
+
+failure() ->
+ {Mod, State} = load_state(),
+ NewState = Mod:failure(State),
+ save_state(Mod, NewState),
+ ok.
+
+
+-spec start(State::term()) -> {NewState::term(), BatchSize::pos_integer()}.
+start(undefined) ->
+ St = #batch_st{
+ size = get_config("batch_initial_size", "100"),
+ search_incr = get_config("batch_search_increment", "500"),
+ sense_incr = get_config("batch_sense_increment", "100"),
+ max_tx_size = get_config("batch_max_tx_size", "9000000"),
+ max_tx_time = get_config("batch_max_tx_time", "4500"),
+ threshold_penalty = get_config("batch_threshold_penalty", "0.2")
+ },
+ start(validate_opts(St));
+
+start(#batch_st{size = Size} = St) ->
+ NewSt = St#batch_st{
+ start_time = erlang:monotonic_time()
+ },
+ {NewSt, Size}.
+
+
+-spec success(
+ TxSize::non_neg_integer(),
+ DocsRead::non_neg_integer(),
+ State::term()
+ ) -> NewState::term().
+success(TxSize, _DocsRead, #batch_st{} = St) ->
+ #batch_st{
+ start_time = StartTime,
+ size = Size,
+ state = State,
+ search_incr = SearchIncr,
+ sense_incr = SenseIncr,
+ max_tx_size = MaxTxSize,
+ max_tx_time = MaxTxTime,
+ threshold_penalty = ThresholdPenalty
+ } = St,
+
+ TxTimeNative = erlang:monotonic_time() - StartTime,
+ TxTime = erlang:convert_time_unit(TxTimeNative, native, millisecond),
+
+ {NewSize, NewState} = case TxSize > MaxTxSize orelse TxTime > MaxTxTime of
+ true ->
+ {round(Size * (1.0 - ThresholdPenalty)), sense};
+ false when State == search ->
+ {Size + SearchIncr, State};
+ false when State == sense ->
+ {Size + SenseIncr, State}
+ end,
+
+ St#batch_st{
+ size = erlang:max(1, NewSize),
+ state = NewState
+ }.
+
+
+-spec failure(State::term()) -> NewState::term().
+failure(#batch_st{} = St) ->
+ St#batch_st{
+ size = erlang:max(1, St#batch_st.size div 2),
+ state = sense
+ }.
+
+
+validate_opts(St) ->
+ #batch_st{
+ size = Size,
+ search_incr = SearchIncr,
+ sense_incr = SenseIncr,
+ max_tx_size = MaxTxSize,
+ max_tx_time = MaxTxTime,
+ threshold_penalty = Penalty
+ } = St,
+ St#batch_st{
+ size = non_neg_integer(Size, batch_initial_size),
+ search_incr = non_neg_integer(SearchIncr, batch_search_increment),
+ sense_incr = non_neg_integer(SenseIncr, batch_sense_increment),
+ max_tx_size = non_neg_integer(MaxTxSize, batch_max_tx_size),
+ max_tx_time = non_neg_integer(MaxTxTime, batch_max_tx_time),
+ threshold_penalty = float_0_to_1(Penalty, batch_threshold_penalty)
+ }.
+
+
+get_config(Key, Default) ->
+ config:get("couch_views", Key, Default).
+
+
+non_neg_integer(Str, Name) ->
+ try
+ Val = list_to_integer(Str),
+ true = Val > 0,
+ Val
+ catch _:_ ->
+ erlang:error({invalid_non_neg_integer, {couch_views, Name, Str}})
+ end.
+
+
+float_0_to_1(Str, Name) ->
+ Val = try
+ list_to_float(Str)
+ catch error:badarg ->
+ erlang:error({invalid_float, {couch_views, Name, Str}})
+ end,
+ if Val >= 0.0 andalso Val =< 1.0 -> Val; true ->
+ erlang:error({float_out_of_range, {couch_views, Name, Str}})
+ end.
+
+
+load_state() ->
+ get(?MODULE).
+
+
+save_state(Mod, Batch) ->
+ put(?MODULE, {Mod, Batch}).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 737b6f8..c2c54e5 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -33,12 +33,20 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric2.hrl").
-% TODO:
-% * Handle timeouts of transaction and other errors
-define(KEY_SIZE_LIMIT, 8000).
-define(VALUE_SIZE_LIMIT, 64000).
+% These are all of the errors that we can fix by using
+% a smaller batch size.
+-define(RETRY_FAILURES, [
+ 1004, % timed_out
+ 1007, % transaction_too_old
+ 1031, % transaction_timed_out
+ 2101 % transaction_too_large
+]).
+
+
spawn_link() ->
proc_lib:spawn_link(?MODULE, init, []).
@@ -80,6 +88,8 @@ init() ->
State = #{
tx_db => undefined,
+ tx_size => 0,
+ docs_read => 0,
db_uuid => DbUUID,
db_seq => undefined,
view_seq => undefined,
@@ -88,7 +98,6 @@ init() ->
job => Job,
job_data => Data,
count => 0,
- limit => num_changes(),
changes_done => 0,
doc_acc => [],
design_opts => Mrst#mrst.design_opts
@@ -162,7 +171,38 @@ add_error(Error, Reason, Data) ->
update(#{} = Db, Mrst0, State0) ->
- {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Limit = couch_views_batch:start(),
+ try
+ {Mrst1, State1} = do_update(Db, Mrst0, State0#{limit => Limit}),
+ case State1 of
+ finished ->
+ couch_eval:release_map_context(Mrst1#mrst.qserver);
+ _ ->
+ #{
+ docs_read := DocsRead,
+ tx_size := TxSize
+ } = State1,
+ couch_views_batch:success(TxSize, DocsRead),
+ update(Db, Mrst1, State1)
+ end
+ catch
+ error:{erlfdb_error, Error} ->
+ case lists:member(Error, ?RETRY_FAILURES) of
+ true ->
+ couch_views_batch:failure(),
+ update(Db, Mrst0, State0);
+ false ->
+ erlang:error({erlfdb_error, Error})
+ end
+ end.
+
+
+do_update(Db, Mrst0, State0) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
State1 = get_update_start_state(TxDb, Mrst0, State0),
{ok, State2} = fold_changes(State1),
@@ -184,6 +224,8 @@ update(#{} = Db, Mrst0, State0) ->
ChangesDone = ChangesDone0 + length(DocAcc),
+ TxSize = erlfdb:wait(erlfdb:get_approximate_size(Tx)),
+
case Count < Limit of
true ->
maybe_set_build_status(TxDb, Mrst1, ViewVS,
@@ -195,20 +237,15 @@ update(#{} = Db, Mrst0, State0) ->
State3 = report_progress(State2, update),
{Mrst1, State3#{
tx_db := undefined,
+ tx_size := TxSize,
+ docs_read := length(DocAcc),
count := 0,
doc_acc := [],
changes_done := ChangesDone,
view_seq := LastSeq
}}
end
- end),
-
- case State4 of
- finished ->
- couch_eval:release_map_context(Mrst2#mrst.qserver);
- _ ->
- update(Db, Mrst2, State4)
- end.
+ end).
maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
@@ -544,10 +581,6 @@ fail_job(Job, Data, Error, Reason) ->
exit(normal).
-num_changes() ->
- config:get_integer("couch_views", "change_limit", 100).
-
-
retry_limit() ->
config:get_integer("couch_views", "retry_limit", 3).
diff --git a/src/couch_views/test/couch_views_batch_test.erl b/src/couch_views/test/couch_views_batch_test.erl
new file mode 100644
index 0000000..79a8865
--- /dev/null
+++ b/src/couch_views/test/couch_views_batch_test.erl
@@ -0,0 +1,82 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_batch_test).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+batch_test_() ->
+ {
+ "Test view batch sizing",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ with([
+ ?TDEF(basic),
+ ?TDEF(search_success),
+ ?TDEF(sense_success),
+ ?TDEF(failure),
+ ?TDEF(failure_switches_to_sense)
+ ])
+ }
+ }.
+
+
+setup() ->
+ test_util:start_couch().
+
+
+cleanup(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+basic(_) ->
+ erase(couch_views_batch),
+ ?assertEqual(100, couch_views_batch:start()).
+
+
+search_success(_) ->
+ erase(couch_views_batch),
+ couch_views_batch:start(),
+ couch_views_batch:success(0, 0),
+ ?assertEqual(600, couch_views_batch:start()).
+
+
+sense_success(_) ->
+ erase(couch_views_batch),
+ couch_views_batch:start(),
+ % Exceeding our threshold switches from search to sense
+ couch_views_batch:success(10000000, 5000),
+ ?assertEqual(80, couch_views_batch:start()),
+ couch_views_batch:success(0, 0),
+ ?assertEqual(180, couch_views_batch:start()).
+
+
+failure(_) ->
+ erase(couch_views_batch),
+ couch_views_batch:start(),
+ couch_views_batch:failure(),
+ ?assertEqual(50, couch_views_batch:start()).
+
+
+failure_switches_to_sense(_) ->
+ erase(couch_views_batch),
+ couch_views_batch:start(),
+ couch_views_batch:failure(),
+ couch_views_batch:start(),
+ couch_views_batch:success(0, 0),
+ ?assertEqual(150, couch_views_batch:start()).
+
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index cff3a2e..86c0a81 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -437,7 +437,7 @@ handle_db_recreated_when_running(Db) ->
% To intercept job building while it is running ensure updates happen one
% row at a time.
- config:set("couch_view", "change_limit", "1", false),
+ config:set("couch_views", "batch_initial_size", "1", false),
meck_intercept_job_update(self()),