You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/09/03 20:03:57 UTC

[couchdb] 02/02: Optimize view indexer batch sizes

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-replace-couch-rate
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5c477f33e4aad2454dc6fd2f2d66d52eea296844
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Sep 3 12:04:56 2020 -0500

    Optimize view indexer batch sizes
    
    The couch_views_batch module is responsible for sensing the largest
    batch sizes that can be successfully processed for a given indexer
    process. It works by initially searching for the maximum number of
    documents that can be included in a batch. Once this threshold is found
    it then works by slowly increasing the batch size and decreasing when
    its found again.
    
    This approach works to maximise batch sizes while being reactive to when
    a larger batch would cross over the FoundationDB transaction limits
    which causes the entire batch to be aborted and retried which wastes
    time during view builds.
---
 rel/overlay/etc/default.ini                       |   8 +
 src/couch_views/src/couch_views_batch.erl         | 192 ++++++++++++++++++++++
 src/couch_views/src/couch_views_indexer.erl       |  65 ++++++--
 src/couch_views/test/couch_views_batch_test.erl   |  82 +++++++++
 src/couch_views/test/couch_views_indexer_test.erl |   2 +-
 5 files changed, 332 insertions(+), 17 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 6cdbed0..6056556 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -309,6 +309,14 @@ iterations = 10 ; iterations for password hashing
 ;
 ; The maximum allowed value size emitted from a view for a document (in bytes)
 ;value_size_limit = 64000
+;
+; Batch size sensing parameters
+; batch_initial_size = 100 ; Initial batch size in number of documents
+; batch_search_increment = 500 ; Size change when searching for the threshold
+; batch_sense_increment = 100 ; Size change increment after hitting a threshold
+; batch_max_tx_size = 9000000 ; Maximum transaction size in bytes
+; batch_max_tx_time = 4500 ; Maximum transaction time in milliseconds
+; batch_thresold_penalty = 0.2 ; Amount to reduce batch size when crossing a threshold
 
 ; CSP (Content Security Policy) Support for _utils
 [csp]
diff --git a/src/couch_views/src/couch_views_batch.erl b/src/couch_views/src/couch_views_batch.erl
new file mode 100644
index 0000000..cfbe61c
--- /dev/null
+++ b/src/couch_views/src/couch_views_batch.erl
@@ -0,0 +1,192 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_batch).
+
+
+-export([
+    start/0,
+    success/2,
+    failure/0
+]).
+
+-export([
+    start/1,
+    success/3,
+    failure/1
+]).
+
+
+-callback start(State::term()) -> {NewState::term(), BatchSize::pos_integer()}.
+-callback success(
+            TxSize::non_neg_integer(),
+            DocsRead::non_neg_integer(),
+            State::term()
+        ) -> NewState::term().
+-callback failure(State::term()) -> NewState::term().
+
+
+-define(DEFAULT_MOD, atom_to_list(?MODULE)).
+
+
+-record(batch_st, {
+    start_time,
+    size,
+    state = search,
+    search_incr,
+    sense_incr,
+    max_tx_size,
+    max_tx_time,
+    threshold_penalty
+}).
+
+
+start() ->
+    {Mod, State} = case load_state() of
+        {M, S} ->
+            {M, S};
+        undefined ->
+            ModStr = config:get("couch_views", "batch_module", ?DEFAULT_MOD),
+            ModAtom = list_to_existing_atom(ModStr),
+            {ModAtom, undefined}
+    end,
+    {NewState, BatchSize} = Mod:start(State),
+    save_state(Mod, NewState),
+    BatchSize.
+
+
+success(TxSize, DocsRead) ->
+    {Mod, State} = load_state(),
+    NewState = Mod:success(TxSize, DocsRead, State),
+    save_state(Mod, NewState),
+    ok.
+
+
+failure() ->
+    {Mod, State} = load_state(),
+    NewState = Mod:failure(State),
+    save_state(Mod, NewState),
+    ok.
+
+
+-spec start(State::term()) -> {NewState::term(), BatchSize::pos_integer()}.
+start(undefined) ->
+    St = #batch_st{
+        size = get_config("batch_initial_size", "100"),
+        search_incr = get_config("batch_search_increment", "500"),
+        sense_incr = get_config("batch_sense_increment", "100"),
+        max_tx_size = get_config("batch_max_tx_size", "9000000"),
+        max_tx_time = get_config("batch_max_tx_time", "4500"),
+        threshold_penalty = get_config("batch_threshold_penalty", "0.2")
+    },
+    start(validate_opts(St));
+
+start(#batch_st{size = Size} = St) ->
+    NewSt = St#batch_st{
+        start_time = erlang:monotonic_time()
+    },
+    {NewSt, Size}.
+
+
+-spec success(
+        TxSize::non_neg_integer(),
+        DocsRead::non_neg_integer(),
+        State::term()
+    ) -> NewState::term().
+success(TxSize, _DocsRead, #batch_st{} = St) ->
+    #batch_st{
+        start_time = StartTime,
+        size = Size,
+        state = State,
+        search_incr = SearchIncr,
+        sense_incr = SenseIncr,
+        max_tx_size = MaxTxSize,
+        max_tx_time = MaxTxTime,
+        threshold_penalty = ThresholdPenalty
+    } = St,
+
+    TxTimeNative = erlang:monotonic_time() - StartTime,
+    TxTime = erlang:convert_time_unit(TxTimeNative, native, millisecond),
+
+    {NewSize, NewState} = case TxSize > MaxTxSize orelse TxTime > MaxTxTime of
+        true ->
+            {round(Size * (1.0 - ThresholdPenalty)), sense};
+        false when State == search ->
+            {Size + SearchIncr, State};
+        false when State == sense ->
+            {Size + SenseIncr, State}
+    end,
+
+    St#batch_st{
+        size = erlang:max(1, NewSize),
+        state = NewState
+    }.
+
+
+-spec failure(State::term()) -> NewState::term().
+failure(#batch_st{} = St) ->
+    St#batch_st{
+        size = erlang:max(1, St#batch_st.size div 2),
+        state = sense
+    }.
+
+
+validate_opts(St) ->
+    #batch_st{
+        size = Size,
+        search_incr = SearchIncr,
+        sense_incr = SenseIncr,
+        max_tx_size = MaxTxSize,
+        max_tx_time = MaxTxTime,
+        threshold_penalty = Penalty
+    } = St,
+    St#batch_st{
+        size = non_neg_integer(Size, batch_initial_size),
+        search_incr = non_neg_integer(SearchIncr, batch_search_increment),
+        sense_incr = non_neg_integer(SenseIncr, batch_sense_increment),
+        max_tx_size = non_neg_integer(MaxTxSize, batch_max_tx_size),
+        max_tx_time = non_neg_integer(MaxTxTime, batch_max_tx_time),
+        threshold_penalty = float_0_to_1(Penalty, batch_threshold_penalty)
+    }.
+
+
+get_config(Key, Default) ->
+    config:get("couch_views", Key, Default).
+
+
+non_neg_integer(Str, Name) ->
+    try
+        Val = list_to_integer(Str),
+        true = Val > 0,
+        Val
+    catch _:_ ->
+        erlang:error({invalid_non_neg_integer, {couch_views, Name, Str}})
+    end.
+
+
+float_0_to_1(Str, Name) ->
+    Val = try
+        list_to_float(Str)
+    catch error:badarg ->
+        erlang:error({invalid_float, {couch_views, Name, Str}})
+    end,
+    if Val >= 0.0 andalso Val =< 1.0 -> Val; true ->
+        erlang:error({float_out_of_range, {couch_views, Name, Str}})
+    end.
+
+
+load_state() ->
+    get(?MODULE).
+
+
+save_state(Mod, Batch) ->
+    put(?MODULE, {Mod, Batch}).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 737b6f8..c2c54e5 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -33,12 +33,20 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 -include_lib("fabric/include/fabric2.hrl").
 
-% TODO:
-%  * Handle timeouts of transaction and other errors
 
 -define(KEY_SIZE_LIMIT, 8000).
 -define(VALUE_SIZE_LIMIT, 64000).
 
+% These are all of the errors that we can fix by using
+% a smaller batch size.
+-define(RETRY_FAILURES, [
+    1004, % timed_out
+    1007, % transaction_too_old
+    1031, % transaction_timed_out
+    2101 % transaction_too_large
+]).
+
+
 spawn_link() ->
     proc_lib:spawn_link(?MODULE, init, []).
 
@@ -80,6 +88,8 @@ init() ->
 
     State = #{
         tx_db => undefined,
+        tx_size => 0,
+        docs_read => 0,
         db_uuid => DbUUID,
         db_seq => undefined,
         view_seq => undefined,
@@ -88,7 +98,6 @@ init() ->
         job => Job,
         job_data => Data,
         count => 0,
-        limit => num_changes(),
         changes_done => 0,
         doc_acc => [],
         design_opts => Mrst#mrst.design_opts
@@ -162,7 +171,38 @@ add_error(Error, Reason, Data) ->
 
 
 update(#{} = Db, Mrst0, State0) ->
-    {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+    Limit = couch_views_batch:start(),
+    try
+        {Mrst1, State1} = do_update(Db, Mrst0, State0#{limit => Limit}),
+        case State1 of
+            finished ->
+                couch_eval:release_map_context(Mrst1#mrst.qserver);
+            _ ->
+                #{
+                    docs_read := DocsRead,
+                    tx_size := TxSize
+                } = State1,
+                couch_views_batch:success(TxSize, DocsRead),
+                update(Db, Mrst1, State1)
+        end
+    catch
+        error:{erlfdb_error, Error} ->
+            case lists:member(Error, ?RETRY_FAILURES) of
+                true ->
+                    couch_views_batch:failure(),
+                    update(Db, Mrst0, State0);
+                false ->
+                    erlang:error({erlfdb_error, Error})
+            end
+    end.
+
+
+do_update(Db, Mrst0, State0) ->
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        #{
+            tx := Tx
+        } = TxDb,
+
         State1 = get_update_start_state(TxDb, Mrst0, State0),
 
         {ok, State2} = fold_changes(State1),
@@ -184,6 +224,8 @@ update(#{} = Db, Mrst0, State0) ->
 
         ChangesDone = ChangesDone0 + length(DocAcc),
 
+        TxSize = erlfdb:wait(erlfdb:get_approximate_size(Tx)),
+
         case Count < Limit of
             true ->
                 maybe_set_build_status(TxDb, Mrst1, ViewVS,
@@ -195,20 +237,15 @@ update(#{} = Db, Mrst0, State0) ->
                 State3 = report_progress(State2, update),
                 {Mrst1, State3#{
                     tx_db := undefined,
+                    tx_size := TxSize,
+                    docs_read := length(DocAcc),
                     count := 0,
                     doc_acc := [],
                     changes_done := ChangesDone,
                     view_seq := LastSeq
                 }}
         end
-    end),
-
-    case State4 of
-        finished ->
-            couch_eval:release_map_context(Mrst2#mrst.qserver);
-        _ ->
-        update(Db, Mrst2, State4)
-    end.
+    end).
 
 
 maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
@@ -544,10 +581,6 @@ fail_job(Job, Data, Error, Reason) ->
     exit(normal).
 
 
-num_changes() ->
-    config:get_integer("couch_views", "change_limit", 100).
-
-
 retry_limit() ->
     config:get_integer("couch_views", "retry_limit", 3).
 
diff --git a/src/couch_views/test/couch_views_batch_test.erl b/src/couch_views/test/couch_views_batch_test.erl
new file mode 100644
index 0000000..79a8865
--- /dev/null
+++ b/src/couch_views/test/couch_views_batch_test.erl
@@ -0,0 +1,82 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_batch_test).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+batch_test_() ->
+    {
+        "Test view batch sizing",
+        {
+            setup,
+            fun setup/0,
+            fun cleanup/1,
+            with([
+                ?TDEF(basic),
+                ?TDEF(search_success),
+                ?TDEF(sense_success),
+                ?TDEF(failure),
+                ?TDEF(failure_switches_to_sense)
+            ])
+        }
+    }.
+
+
+setup() ->
+    test_util:start_couch().
+
+
+cleanup(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+basic(_) ->
+    erase(couch_views_batch),
+    ?assertEqual(100, couch_views_batch:start()).
+
+
+search_success(_) ->
+    erase(couch_views_batch),
+    couch_views_batch:start(),
+    couch_views_batch:success(0, 0),
+    ?assertEqual(600, couch_views_batch:start()).
+
+
+sense_success(_) ->
+    erase(couch_views_batch),
+    couch_views_batch:start(),
+    % Exceeding our threshold switches from search to sense
+    couch_views_batch:success(10000000, 5000),
+    ?assertEqual(80, couch_views_batch:start()),
+    couch_views_batch:success(0, 0),
+    ?assertEqual(180, couch_views_batch:start()).
+
+
+failure(_) ->
+    erase(couch_views_batch),
+    couch_views_batch:start(),
+    couch_views_batch:failure(),
+    ?assertEqual(50, couch_views_batch:start()).
+
+
+failure_switches_to_sense(_) ->
+    erase(couch_views_batch),
+    couch_views_batch:start(),
+    couch_views_batch:failure(),
+    couch_views_batch:start(),
+    couch_views_batch:success(0, 0),
+    ?assertEqual(150, couch_views_batch:start()).
+
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index cff3a2e..86c0a81 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -437,7 +437,7 @@ handle_db_recreated_when_running(Db) ->
 
     % To intercept job building while it is running ensure updates happen one
     % row at a time.
-    config:set("couch_view", "change_limit", "1", false),
+    config:set("couch_views", "batch_initial_size", "1", false),
 
     meck_intercept_job_update(self()),