You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2022/08/17 21:05:05 UTC

[couchdb] branch raft_storemodule updated (c8b859569 -> 725b2cdc1)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git


 discard c8b859569 separate follower and candidate timeouts
 discard eec4a4c0d clear votesGranted when unused for readability
 discard cb64903b5 don't track matchIndex/nextIndex in non-leaders, pointless
 discard dfda5e000 introduce store abstraction (WIP)
 discard b0b12b6d2 Integrate raft algorithm (WIP)
     add 0156029fe Display main build status in README
     add 17d359b04 Merge pull request #4063 from apache/big-r81-patch-1
     add 592837abc Add Erlang 25 support
     add 6f85bac20 Merge pull request #4060 from apache/big-r81-add-erlang-25-support
     add 687b4e023 Enable replicating purge requests between nodes
     add 63e09832a Ignore repeated interactive purges
     add 428d280e4 Update CI to remove Erlang < 23
     add b4aadedcb Update rebar config to remove Erlang < 23 support
     add eb9884e95 Skip Erlang version != 20 in configure
     add 11eea624b Set low erlang version in Jenkins.pr to 23 instead of 20
     add 0a229d8fb Remove Erlang < 23 ifdefs and other macros
     add 1d2ba3e5e Update erlfmt formatting for smoosh_server
     add cb80c634a Lowest CI image version Erlang 23 doesn't have spidermonkey 1.8.5
     add 6da9405ff Erlang 25 compatibility - http_uri:parse -> uri_string:parse
     add 1bb3da87c Fix deprecated function warning
     add 09737dfd6 Delete unused include_lib
     add 507932ca3 Replace include with include_lib
     add 19d2f9b69 Fix "variable bound multiple times" warning
     add 52ef641f1 Fix full CI build
     add 211509e58 Bypass macos builder for now while we sort the rebar issue
     add 116c4e956 Fix couch_debug:opened_files* functions
     add c417303ef Prevent error:function_clause in check_security/3 if roles claim is malformed (#4070)
     add 47866dea3 - rename master to main - fixing links
     add f164b2dd9 Adding commit message conventions and update links
     add ab00be437 Use https as default protocol for links
     add 0bfbb63e4 Backport commits from fdbmain into main (old 3.x)
     add adf17140e Merge pull request #4079 from apache/backport-from-fdbmain
     add 23b352d76 Small url fixes
     add b424ad12a Merge pull request #4080 from apache/change-irc-url
     add c605e0458 Fix Elixir 13 compatibility
     add 2c351d62c Update vm.args for Erlang 23+
     add ea5df65c5 Bring back POWER full builds
     add eb2f8d998 Add Erlang 25 to PR CI pipeline and Ubuntu Jammy to full CI
     add e41465ec8 Add an option to let custodian always use [cluster] n value
     add 29ac7853f Optimize couch_util:to_hex/1
     add 6a455c74b Implement winning_revs_only option for the replicator
     add eb0b28a70 Fix flaky "validate doc update" elixir test
     add 74017fd5d Skip uploading build logs for now
     add 4fab0509d Skip nightly package uploads since nothing seems to be using them
     add 5eef3fff5 Improve error handling in smoosh_utils:write_to_file/3
     add 22f0b44ef Merge pull request #4093 from noahshaw11/fix-error-handling-smoosh
     add b749b219b Add filepath to is_compacting
     add 330703cae Remove some left-over local endpoint clauses in replicator
     add 02c0c75c2 Clean up unused code and invalid spec from replicator
     add 76dd66f40 Remove view compaction jobs recovery
     add 005843a43 Fix not calling is_compacting test
     add d0fd91529 Fix not_found error smoosh
     add 7fb96d265 Add toggle for smoosh queue persistence
     add daff65d8c Replace SHA-1 with SHA-256 for cookie authentication (#4094)
     add 42be159c7 Trim X-Auth-CouchDB-Roles header after reading
     add 9965289f2 Update elixir to 1.13
     add c71239bf0 Update application description and dependencies
     add ebbcc7ec2 Fix the flaky tests for `create_doc()`
     add b3586f1f5 Fix stats endpoint
     add 8c99dc530 make haproxy config valid again
     add f4ff8aa12 Merge pull request #4123 from apache/dev-run-fix-haproxy-cfg
     add a431b930f Turn document update mode atoms into defines
     add 35b30385a Return a 400 response for a single new_edits=false doc update without revisions
     add 419447cd1 Remove `couch_tests`
     add 02ca8c62c Merge pull request #4125 from jiahuili430/couch-tests
     add 3527d3047 Revert "Replace SHA-1 with SHA-256 for cookie authentication (#4094)"
     add fff03ef8e Merge pull request #4128 from apache/revert-4094-for-now
     add 963daf6ca Implement view_report function
     add a45e82aa1 Merge pull request #4033 from noahshaw11/implement-view_report-function
     add 2be1da823 Add io_priority classes
     add c09cd8968 Add ioq io_priority functions and system class
     add 74f12c74d Merge pull request #4106 from apache/4101-add-io-priority
     add deef12eff Add ioq:call_search
     add 7f1a33169 Merge pull request #4135 from apache/dedicated-ioq-search-function
     add 1f1c56d5d Fix elixir :logger warnings
     add 90f20c849 Add editors magic lines
     add cfed4bb07 Merge pull request #4133 from noahshaw11/add-editors-magic-lines
     add f8dad2fe6 Fix proxyauth_test and remove it from skipping tests (#4129)
     add 96214780e Fix purge request timeouts
     add 6c57f58dd Update couchdb-config to 2.2.0
     add 9a5d0de97 Update couchdb-mochiweb to v3.1.0
     new 5be100df2 Integrate raft algorithm (WIP)
     new f2981b8ea introduce store abstraction (WIP)
     new 7e5fc6439 don't track matchIndex/nextIndex in non-leaders, pointless
     new 0b440b86f clear votesGranted when unused for readability
     new 725b2cdc1 separate follower and candidate timeouts

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c8b859569)
            \
             N -- N -- N   refs/heads/raft_storemodule (725b2cdc1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/ISSUE_TEMPLATE/rfc.md                      |   2 +-
 .gitignore                                         |   3 +
 CONTRIBUTING.md                                    | 132 ++++---
 Makefile                                           |   2 +-
 README.rst                                         |  18 +-
 build-aux/Jenkinsfile.full                         |  78 ++--
 build-aux/Jenkinsfile.pr                           |  44 +--
 build-aux/logfile-uploader.py                      |   1 +
 config/config.exs                                  |   4 +-
 config/dev.exs                                     |   2 +-
 config/integration.exs                             |   6 +-
 config/prod.exs                                    |   2 +-
 config/test.exs                                    |   6 +-
 configure                                          |   4 +-
 dev/run                                            |   3 +-
 mix.exs                                            |  23 +-
 mix.lock                                           |  19 -
 rebar.config.script                                |  21 +-
 rel/overlay/etc/default.ini                        |  23 +-
 rel/overlay/etc/local.ini                          |   2 +-
 rel/overlay/etc/vm.args                            |  11 +-
 src/chttpd/rebar.config                            |   2 +
 src/chttpd/src/chttpd.app.src                      |   7 +-
 src/chttpd/src/chttpd.erl                          |  52 +--
 src/chttpd/src/chttpd_auth_cache.erl               |   6 +-
 src/chttpd/src/chttpd_db.erl                       |  14 +-
 src/chttpd/src/chttpd_node.erl                     |  21 +-
 src/chttpd/src/chttpd_stats.erl                    |   7 +-
 src/chttpd/src/chttpd_sup.erl                      |   2 +-
 src/chttpd/test/eunit/chttpd_db_test.erl           |  56 +++
 .../test/eunit/chttpd_open_revs_error_test.erl     |   2 +-
 src/couch/include/couch_db.hrl                     |  26 +-
 src/couch/rebar.config.script                      |   2 +
 src/couch/src/couch.app.src                        |   7 +-
 src/couch/src/couch_app.erl                        |   2 -
 src/couch/src/couch_bt_engine.erl                  |   4 +-
 src/couch/src/couch_db.erl                         |  83 +++--
 src/couch/src/couch_db_engine.erl                  |   4 +-
 src/couch/src/couch_db_updater.erl                 |   2 +-
 src/couch/src/couch_debug.erl                      | 138 +++++---
 src/couch/src/couch_doc.erl                        |   2 +-
 src/couch/src/couch_httpd.erl                      | 239 +++++++------
 src/couch/src/couch_httpd_auth.erl                 |   8 +-
 src/couch/src/couch_httpd_db.erl                   |   8 +-
 src/couch/src/couch_httpd_vhost.erl                |   2 +-
 src/couch/src/couch_key_tree.erl                   |   4 +-
 src/couch/src/couch_passwords.erl                  |   4 +-
 src/couch/src/couch_proc_manager.erl               |  10 +-
 src/couch/src/couch_query_servers.erl              |   9 +-
 src/couch/src/couch_server.erl                     |   6 +-
 src/couch/src/couch_util.erl                       | 112 +++---
 src/couch/src/couch_uuids.erl                      |   2 +-
 src/couch/src/test_util.erl                        |   4 +-
 src/couch/test/eunit/couch_auth_cache_tests.erl    |   2 +-
 .../test/eunit/couch_bt_engine_compactor_tests.erl |  35 +-
 src/couch/test/eunit/couch_db_plugin_tests.erl     |  16 +-
 src/couch/test/eunit/couch_js_tests.erl            |   2 +-
 src/couch/test/eunit/couch_key_tree_tests.erl      |   2 +-
 src/couch/test/eunit/couch_util_tests.erl          |  43 +++
 src/couch/test/eunit/couch_uuids_tests.erl         |   4 +-
 .../test/eunit/couchdb_update_conflicts_tests.erl  |   4 +-
 src/couch_dist/rebar.config                        |   2 +
 src/couch_epi/rebar.config                         |   2 +
 src/couch_epi/src/couch_epi.app.src.script         |  24 +-
 src/couch_epi/test/eunit/couch_epi_tests.erl       |   2 +-
 src/couch_event/rebar.config                       |   2 +
 src/couch_index/rebar.config                       |   2 +
 src/couch_index/src/couch_index.app.src            |   2 +-
 src/couch_index/src/couch_index.erl                |   1 +
 src/couch_index/src/couch_index_util.erl           |   2 +-
 src/couch_log/rebar.config                         |   2 +
 src/couch_log/src/couch_log_monitor.erl            |  12 -
 src/couch_log/src/couch_log_trunc_io_fmt.erl       |   2 +-
 .../test/eunit/couch_log_formatter_test.erl        |   4 +-
 src/couch_mrview/rebar.config                      |   2 +
 src/couch_mrview/src/couch_mrview_debug.erl        | 391 ++++++++++++++++++++-
 src/couch_mrview/src/couch_mrview_http.erl         |   2 +-
 .../test/eunit/couch_mrview_purge_docs_tests.erl   |   6 +-
 src/couch_peruser/src/couch_peruser.app.src        |   2 +-
 src/couch_peruser/src/couch_peruser.erl            |   2 +
 src/couch_prometheus/src/couch_prometheus_util.erl |   2 +-
 src/couch_pse_tests/src/cpse_test_purge_docs.erl   |  21 +-
 src/couch_pse_tests/src/cpse_test_purge_seqs.erl   |   2 +-
 src/couch_pse_tests/src/cpse_util.erl              |   6 +-
 src/couch_replicator/src/couch_replicator.app.src  |   3 +-
 .../src/couch_replicator_api_wrap.erl              |  55 +--
 src/couch_replicator/src/couch_replicator_auth.erl |   2 +-
 .../src/couch_replicator_auth_session.erl          |  15 +-
 .../src/couch_replicator_changes_reader.erl        |  14 +-
 .../src/couch_replicator_doc_processor_worker.erl  |   2 +-
 src/couch_replicator/src/couch_replicator_docs.erl |  18 +-
 .../src/couch_replicator_httpc.erl                 |   3 +-
 src/couch_replicator/src/couch_replicator_ids.erl  |  48 ++-
 .../src/couch_replicator_js_functions.hrl          |   6 +
 .../src/couch_replicator_scheduler.erl             |   2 +-
 .../src/couch_replicator_scheduler_job.erl         |  93 +++--
 .../src/couch_replicator_utils.erl                 |   6 +-
 .../src/couch_replicator_worker.erl                |   7 +-
 src/couch_replicator/src/json_stream_parse.erl     |   2 +-
 .../couch_replicator_error_reporting_tests.erl     |   2 +-
 .../eunit/couch_replicator_many_leaves_tests.erl   | 134 ++++---
 src/couch_tests/rebar.config                       |   2 +
 src/custodian/rebar.config.script                  |   2 +
 src/custodian/src/custodian_util.erl               |   9 +-
 src/ddoc_cache/src/ddoc_cache_entry.erl            |   5 +-
 src/dreyfus/src/clouseau_rpc.erl                   |   2 +-
 src/dreyfus/test/elixir/test/test_helper.exs       |   2 +-
 src/fabric/rebar.config                            |   2 +
 src/fabric/src/fabric_doc_open.erl                 |   2 +-
 src/fabric/src/fabric_doc_open_revs.erl            |   2 +-
 src/fabric/src/fabric_doc_purge.erl                |   2 +-
 src/fabric/src/fabric_doc_update.erl               |   4 +-
 src/fabric/src/fabric_rpc.erl                      |  42 ++-
 src/fabric/src/fabric_util.erl                     |  14 -
 src/fabric/src/fabric_view_all_docs.erl            |  10 +-
 src/fabric/test/eunit/fabric_db_create_tests.erl   |   4 +-
 src/global_changes/src/global_changes.app.src      |   3 +-
 src/global_changes/src/global_changes_server.erl   |   5 +-
 src/ioq/src/ioq.erl                                |  24 +-
 src/jwtf/rebar.config                              |   2 +
 src/jwtf/src/jwtf.erl                              |  23 --
 src/ken/rebar.config.script                        |   2 +
 src/ken/src/ken.app.src.script                     |  17 +-
 src/mango/rebar.config.script                      |   2 +
 src/mango/src/mango_cursor_view.erl                |   2 +-
 src/mango/src/mango_httpd.erl                      |  10 +-
 src/mango/src/mango_selector.erl                   |   4 +-
 src/mango/src/mango_selector_text.erl              |   2 +-
 src/mango/src/mango_util.erl                       |   6 +-
 src/mem3/rebar.config                              |   2 +
 src/mem3/rebar.config.script                       |   2 +
 src/mem3/src/mem3.app.src                          |   3 +-
 src/mem3/src/mem3.erl                              |   4 +-
 src/mem3/src/mem3_bdu.erl                          |   2 +-
 src/mem3/src/mem3_nodes.erl                        |   7 +-
 src/mem3/src/mem3_rep.erl                          |   8 +-
 src/mem3/src/mem3_reshard_httpd.erl                |   4 +-
 src/mem3/src/mem3_shards.erl                       |   6 +-
 src/mem3/src/mem3_util.erl                         |  11 +-
 src/rexi/rebar.config                              |   2 +
 src/rexi/src/rexi_server.erl                       |  45 ++-
 src/setup/src/setup.app.src                        |  29 +-
 src/smoosh/rebar.config                            |   2 +
 src/smoosh/src/smoosh_channel.erl                  |  87 +++--
 src/smoosh/src/smoosh_priority_queue.erl           |   2 +-
 src/smoosh/src/smoosh_server.erl                   |  41 ++-
 src/smoosh/src/smoosh_utils.erl                    |  61 +++-
 src/smoosh/test/smoosh_tests.erl                   |  75 ++--
 src/weatherreport/rebar.config                     |   2 +
 src/weatherreport/src/weatherreport.app.src        |   5 +-
 test/elixir/README.md                              |   2 +-
 test/elixir/config/config.exs                      |   2 +-
 test/elixir/config/test.exs                        |   4 +-
 test/elixir/lib/couch/{db_test.ex => dbtest.ex}    |   0
 test/elixir/lib/step/start.ex                      |   4 +-
 test/elixir/lib/suite.ex                           |   2 +-
 test/elixir/test/bulk_docs_test.exs                |   4 +-
 test/elixir/test/changes_test.exs                  |   4 +-
 test/elixir/test/config/skip.elixir                |   4 -
 test/elixir/test/config/suite.elixir               |   8 +-
 test/elixir/test/cookie_auth_test.exs              |   2 +-
 test/elixir/test/design_docs_test.exs              |  16 +-
 test/elixir/test/proxyauth_test.exs                |  95 ++---
 163 files changed, 1918 insertions(+), 1103 deletions(-)
 delete mode 100644 mix.lock
 rename test/elixir/lib/couch/{db_test.ex => dbtest.ex} (100%)


[couchdb] 01/05: Integrate raft algorithm (WIP)

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5be100df24446dff44cb7eeeaa31f89eb0a9db75
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat May 14 20:28:05 2022 +0100

    Integrate raft algorithm (WIP)
    
    couch_raft.erl is a complete implementation of the raft algorithm but
    currently only manages an in-memory state machine and log.
    
    Preliminary work is also here to add a new btree inside the `.couch`
    files, which will be the real raft log. The intent is that log entries
    can be removed from this log and applied to by_id and by_seq trees
    atomically.
    
    raft log is not preserved over compaction yet because reading the
    compactor code hurts my eyes.
    
    Anyway, it's progress and hopefully we're going somewhere cool.
---
 src/couch/src/couch_bt_engine.erl         |  95 +++++++-
 src/couch/src/couch_bt_engine.hrl         |   3 +-
 src/couch/src/couch_bt_engine_header.erl  |   7 +-
 src/couch/src/couch_db.erl                |  19 ++
 src/couch/src/couch_db_engine.erl         |  27 +++
 src/couch/src/couch_db_updater.erl        |  10 +
 src/couch/src/couch_raft.erl              | 350 ++++++++++++++++++++++++++++++
 src/couch/src/couch_raft_log.erl          |  52 +++++
 src/couch/test/eunit/couch_raft_SUITE.erl |  67 ++++++
 9 files changed, 621 insertions(+), 9 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 0549de566..8c1a2756d 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
     purge_docs/3,
     copy_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     commit_data/1,
 
     open_write_stream/2,
@@ -102,7 +107,11 @@
     purge_tree_join/2,
     purge_tree_reduce/2,
     purge_seq_tree_split/1,
-    purge_seq_tree_join/2
+    purge_seq_tree_join/2,
+
+    raft_tree_split/1,
+    raft_tree_join/2,
+    raft_tree_reduce/2
 ]).
 
 % Used by the compactor
@@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) ->
     {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
     Changes.
 
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+    Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+    lists:map(
+        fun
+            ({ok, Entry}) -> Entry;
+            (not_found) -> not_found
+        end,
+        Results
+    ).
+
+raft_discard(#st{} = St, UpTo) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+    {FirstIndex, _FirstTerm} = First,
+    Remove = lists:seq(FirstIndex, UpTo),
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+
+raft_last(#st{} = St) ->
+    {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+    Last.
+
 start_compaction(St, DbName, Options, Parent) ->
     Args = [St, DbName, Options, Parent],
     Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) ->
 purge_tree_reduce(rereduce, Reds) ->
     lists:sum(Reds).
 
+raft_tree_split({Index, Term, Value}) ->
+    {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+    {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+    {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+    {MinIndex, MinTerm, _} = lists:min(Entries),
+    {MaxIndex, MaxTerm, _} = lists:max(Entries),
+    {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+    {Mins, Maxs} = lists:unzip(Reds),
+    {lists:min(Mins), lists:max(Maxs)}.
+
 set_update_seq(#st{header = Header} = St, UpdateSeq) ->
     {ok, St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) ->
         {reduce, fun ?MODULE:purge_tree_reduce/2}
     ]),
 
+    RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+    {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+        {split, fun ?MODULE:raft_tree_split/1},
+        {join, fun ?MODULE:raft_tree_join/2},
+        {reduce, fun ?MODULE:raft_tree_reduce/2}
+    ]),
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) ->
         local_tree = LocalTree,
         compression = Compression,
         purge_tree = PurgeTree,
-        purge_seq_tree = PurgeSeqTree
+        purge_seq_tree = PurgeSeqTree,
+        raft_tree = RaftTree
     },
 
     % If this is a new database we've just created a
@@ -927,7 +999,8 @@ update_header(St, Header) ->
         {id_tree_state, couch_btree:get_state(St#st.id_tree)},
         {local_tree_state, couch_btree:get_state(St#st.local_tree)},
         {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
-        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+        {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
     ]).
 
 increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) ->
         St#st.seq_tree,
         St#st.local_tree,
         St#st.purge_tree,
-        St#st.purge_seq_tree
+        St#st.purge_seq_tree,
+        St#st.raft_tree
     ],
     lists:foldl(
         fun(T, Acc) ->
@@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) ->
 finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     #st{
         filepath = FilePath,
-        local_tree = OldLocal
+        local_tree = OldLocal,
+        raft_tree = OldRaft
     } = OldSt,
     #st{
         filepath = CompactDataPath,
         header = Header,
-        local_tree = NewLocal1
+        local_tree = NewLocal1,
+        raft_tree = NewRaft1
     } = NewSt1,
 
     % suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
     {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
 
+    % do the same for the raft log
+    {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+    {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
     {ok, NewSt2} = commit_data(NewSt1#st{
         header = couch_bt_engine_header:set(Header, [
             {compacted_seq, get_update_seq(OldSt)},
             {revs_limit, get_revs_limit(OldSt)},
             {purge_infos_limit, get_purge_infos_limit(OldSt)}
         ]),
-        local_tree = NewLocal2
+        local_tree = NewLocal2,
+        raft_tree = NewRaft2
     }),
 
     % Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d4983..0d347e99b 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
     local_tree,
     compression,
     purge_tree,
-    purge_seq_tree
+    purge_seq_tree,
+    raft_tree
 }).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f07723..9e663b096 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
     purge_tree_state/1,
     purge_seq_tree_state/1,
     purge_infos_limit/1,
+    raft_tree_state/1,
     security_ptr/1,
     revs_limit/1,
     uuid/1,
@@ -69,7 +70,8 @@
     epochs,
     compacted_seq,
     purge_infos_limit = 1000,
-    props_ptr
+    props_ptr,
+    raft_tree_state = nil
 }).
 
 -define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@ compacted_seq(Header) ->
 purge_infos_limit(Header) ->
     get_field(Header, purge_infos_limit).
 
+raft_tree_state(Header) ->
+    get_field(Header, raft_tree_state).
+
 get_field(Header, Field) ->
     get_field(Header, Field, undefined).
 
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index dd7e07517..e197f98a4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
     fold_purge_infos/4,
     fold_purge_infos/5,
 
+    raft_insert/2,
+    raft_lookup/2,
+    raft_discard/2,
+    raft_last/1,
+
     calculate_start_seq/3,
     owner_of/2,
 
@@ -1822,6 +1827,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
 fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
     couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
 
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+    couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+    couch_db_engine:raft_last(Db).
+
 count_changes_since(Db, SinceSeq) ->
     couch_db_engine:count_changes_since(Db, SinceSeq).
 
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 052a527e3..63b9d49a3 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
     read_doc_body/2,
     load_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     serialize_doc/2,
     write_doc_body/2,
     write_doc_infos/3,
@@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) ->
     ),
     {ok, Db#db{engine = {Engine, NewSt}}}.
 
+raft_insert(#db{} = Db, Entries) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_insert(
+        EngineState, Entries
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_discard(
+        EngineState, UpTo
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_last(EngineState).
+
 commit_data(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 0248c21ec..7c1f97804 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
         end,
     {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
     {reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
         {reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 000000000..f398b4f2a
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+    start/2,
+    start_link/2,
+    stop/1,
+    call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+    init/1,
+    callback_mode/0,
+    handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+    gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+    gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+    Peers = peers(Cohort),
+    #{
+        name => Name,
+        cohort => Cohort,
+        term => 0,
+        votedFor => undefined,
+        votesGranted => #{},
+        nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+        log => couch_raft_log:new(),
+        commitIndex => 0,
+        froms => #{},
+        lastApplied => 0,
+        machine => <<0>>
+    }.
+
+stop(ServerRef) ->
+    gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+    gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+    {ok, follower, Data}.
+
+callback_mode() ->
+    [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+    couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+    {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+    #{term := Term, froms := Froms} = Data,
+    couch_log:notice("~p became follower in term ~B", [node(), Term]),
+    Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+    {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+    #{term := Term} = Data,
+    couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+    {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+    #{log := Log, cohort := Cohort, term := Term} = Data,
+    couch_log:notice("~p became leader in term ~B", [node(), Term]),
+    Peers = peers(Cohort),
+    {keep_state, Data#{
+        nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+    }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        lastLogIndex := MLastLogIndex,
+        lastLogTerm := MLastLogTerm
+    } = Msg,
+    #{
+        log := Log,
+        votedFor := VotedFor
+    } = Data,
+    LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+    Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+    couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+    Reply = #{
+        type => 'RequestVoteResponse',
+        term => CurrentTerm,
+        voteGranted => Grant,
+        source => node()
+    },
+    cast(MSource, Reply, Data),
+    if
+        Grant ->
+            {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+        true ->
+            {keep_state_and_data, restart_election_timeout()}
+    end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{source := MSource, voteGranted := MVoteGranted} = Msg,
+    #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+    VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+    couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+    if
+        length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+            couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+            {next_state, leader, Data#{votesGranted => VotesGranted1}};
+        true ->
+            {keep_state, Data#{votesGranted => VotesGranted1}}
+    end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        prevLogIndex := MPrevLogIndex,
+        prevLogTerm := MPrevLogTerm,
+        entries := MEntries,
+        commitIndex := MCommitIndex
+    } = Msg,
+    #{
+        log := Log
+    } = Data,
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+    if
+        Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+            Reply = #{
+                type => 'AppendEntriesResponse',
+                term => CurrentTerm,
+                success => false,
+                matchIndex => 0,
+                source => node()
+            },
+            cast(MSource, Reply, Data),
+            if
+                State == leader ->
+                    keep_state_and_data;
+                true ->
+                    {keep_state_and_data, restart_election_timeout()}
+            end;
+        Term == CurrentTerm andalso State == candidate ->
+            {next_state, follower, Data, {next_event, cast, Msg}};
+        Term == CurrentTerm andalso State == follower andalso LogOk ->
+            if
+                MEntries == [] ->
+                    Reply = #{
+                        type => 'AppendEntriesResponse',
+                        term => CurrentTerm,
+                        success => true,
+                        matchIndex => MPrevLogIndex,
+                        source => node()
+                    },
+                    couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+                    cast(MSource, Reply, Data),
+                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                true ->
+                    Index = MPrevLogIndex + 1,
+                    LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+                    if
+                        LastLogIndex >= Index ->
+                            NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+                            FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+                            if
+                                NthLogTerm == FirstEntryTerm ->
+                                    Reply = #{
+                                        type => 'AppendEntriesResponse',
+                                        term => CurrentTerm,
+                                        success => true,
+                                        matchIndex => MPrevLogIndex + length(MEntries),
+                                        source => node()
+                                    },
+                                    couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+                                    cast(MSource, Reply, Data),
+                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                                NthLogTerm /= FirstEntryTerm ->
+                                    couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+                                    {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                            end;
+                        LastLogIndex == MPrevLogIndex ->
+                            couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+                            {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                    end
+            end
+    end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+    #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+    couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+    SourceNextIndex = maps:get(MSource, NextIndex),
+    if
+        MSuccess ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+                matchIndex => MatchIndex#{MSource => MMatchIndex}
+            }};
+        true ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+            }}
+    end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+    #{value := Value} = Msg,
+    #{term := Term, log := Log, froms := Froms} = Data,
+    EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+    Entry = {EntryIndex, Term, Value},
+    {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+    {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+    #{term := Term} = Data,
+    couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+    {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+    #{term := Term} = Data,
+    couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+    ok = send_append_entries(Data),
+    {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+    {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+    send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+    ok;
+send_append_entries([Peer | Rest], Data) ->
+    #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+    PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+    PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+    LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+    Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+    Msg = #{
+        type => 'AppendEntriesRequest',
+        term => Term,
+        source => node(),
+        prevLogIndex => PrevLogIndex,
+        prevLogTerm => PrevLogTerm,
+        entries => Entries,
+        commitIndex => min(CommitIndex, LastEntry)
+    },
+    cast(Peer, Msg, Data),
+    send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+    #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+    LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+    LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+    NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+    if
+        LastTerm == Term ->
+            update_state_machine(Data#{commitIndex => NewCommitIndex});
+        true ->
+            Data
+    end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+    Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+    #{log := Log, froms := Froms0, machine := Machine0} = Data,
+    From = LastApplied + 1,
+    To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+    Fun = fun(Index, {Froms, Machine}) ->
+        Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+        Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+        case maps:is_key(Index, Froms) of
+            true ->
+                gen_statem:reply(maps:get(Index, Froms), Result),
+                {maps:remove(Index, Froms), Result};
+            false ->
+                {Froms, Result}
+        end
+    end,
+    {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+    Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+    #{term := Term, cohort := Cohort, log := Log} = Data,
+    ElectionTerm = Term + 1,
+    couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    RequestVote = #{
+        type => 'RequestVoteRequest',
+        term => ElectionTerm,
+        lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+        lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+        source => node()
+    },
+    lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+    Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+    gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+    {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+    {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+    Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 000000000..987212457
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+    new/0,
+    append/2,
+    sublist/3,
+    nth/2,
+    last/1,
+    index/1,
+    term/1,
+    value/1
+]).
+
+new() ->
+    [].
+
+append(Log, Items) ->
+    lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+    lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+    lists:nth(N, Log).
+
+last([]) ->
+    {0, 0, undefined};
+last(Log) ->
+    lists:last(Log).
+
+index(Entry) ->
+    element(1, Entry).
+
+term(Entry) ->
+    element(2, Entry).
+
+value(Entry) ->
+    element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 000000000..1c3f8ebc2
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+    N = 3,
+    Args = ["-pa", filename:dirname(code:which(craft))],
+    Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+    Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+    Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+    % wait for leader election
+    timer:sleep(500),
+
+    % verify only one leader elected
+    [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+    [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+    % make a series of calls
+    Hash1 = crypto:hash(sha256, <<0, 1>>),
+    ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+    Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+    ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+    Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+    ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+    % force a re-election
+    craft3:stop(FirstLeader),
+    timer:sleep(500),
+
+    % verify new leader elected
+    [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+    ?assertNotEqual(FirstLeader, SecondLeader),
+
+    % make another call
+    Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+    ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+    [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+    [peer:stop(Peer) || {ok, Peer} <- Peers].


[couchdb] 05/05: separate follower and candidate timeouts

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 725b2cdc1bff165ec4cfb4512440edaee575a470
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 17 21:57:53 2022 +0100

    separate follower and candidate timeouts
    
    From;
    
    ARC: Analysis of Raft Consensus - 4.2
    
    "As the authors use the same timer range for candidates and followers,
    in Figure 4.1 we are waiting a minimum of 150ms (and up to twice that)
    before restarting an election, despite the fact that, on average, a
    node receives all of its responses within 15ms"
    
    We separate the timeouts and set the candidate timeout smaller than
    the follower timeout. In a contested election (where multiple
    candidates each gain a minority of votes) we should elect a leader
    faster than otherwise.
---
 src/couch/src/couch_raft.erl | 41 +++++++++++++++++++++--------------------
 1 file changed, 21 insertions(+), 20 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index 06025784e..98fb6f926 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -15,9 +15,6 @@
 -module(couch_raft).
 -behaviour(gen_statem).
 
--define(ELECTION_DELAY, 150).
--define(ELECTION_SPLAY, 150).
--define(LEADER_HEARTBEAT, 75).
 -define(CLIENT_TIMEOUT, 5_000).
 
 % maximum number of entries to send in one go.
@@ -78,12 +75,12 @@ handle_event(enter, _OldState, follower, Data) ->
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
     persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
-        [restart_election_timeout() | Replies]});
+        [state_timeout(follower) | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
     couch_log:notice("~p became candidate in term ~B", [node(), Term]),
-    persist({keep_state, start_election(Data), restart_election_timeout()});
+    persist({keep_state, start_election(Data), state_timeout(candidate)});
 
 handle_event(enter, _OldState, leader, Data) ->
     #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
@@ -93,9 +90,9 @@ handle_event(enter, _OldState, leader, Data) ->
     {keep_state, Data#{
         nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
         matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
-    }, restart_heartbeat_timeout()};
+    }, state_timeout(leader)};
 
-handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
   when Term =< CurrentTerm ->
     #{
         source := MSource,
@@ -119,9 +116,9 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
     cast(MSource, Reply, Data),
     if
         Grant ->
-            persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
+            persist({keep_state, Data#{votedFor => MSource}, state_timeout(State)});
         true ->
-            {keep_state_and_data, restart_election_timeout()}
+            {keep_state_and_data, state_timeout(State)}
     end;
 
 handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
@@ -171,7 +168,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                 State == leader ->
                     keep_state_and_data;
                 true ->
-                    {keep_state_and_data, restart_election_timeout()}
+                    {keep_state_and_data, state_timeout(State)}
             end;
         Term == CurrentTerm andalso State == candidate ->
             {next_state, follower, Data, {next_event, cast, Msg}};
@@ -187,7 +184,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                     },
                     couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
                     cast(MSource, Reply, Data),
-                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
                 true ->
                     Index = MPrevLogIndex + 1,
                     if
@@ -205,12 +202,12 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                                     },
                                     couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
                                     cast(MSource, Reply, Data),
-                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
                                 NthLogTerm /= FirstEntryTerm ->
                                     couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
                                     case StoreModule:truncate(LastIndex - 1, Data) of
                                         {ok, NewData} ->
-                                            {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                            {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
                                         {error, Reason} ->
                                             {stop, Reason}
                                     end
@@ -219,7 +216,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                             couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
                             case StoreModule:append(MEntries, Data) of
                                 {ok, _EntryIndex, NewData} ->
-                                    {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                    {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
                                 {error, Reason} ->
                                     {stop, Reason}
                             end
@@ -268,13 +265,13 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
 handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
     #{term := Term} = Data,
     couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
-    persist({next_state, candidate, start_election(Data), restart_election_timeout()});
+    persist({next_state, candidate, start_election(Data), state_timeout(State)});
 
 handle_event(state_timeout, heartbeat, leader, Data) ->
     #{term := Term} = Data,
     couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
     ok = send_append_entries(Data),
-    {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+    {keep_state, advance_commit_index(Data), state_timeout(leader)};
 
 handle_event(EventType, EventContent, State, Data) ->
     {stop, {unknown_event, EventType, EventContent, State, Data}}.
@@ -360,11 +357,15 @@ start_election(Data) ->
 cast(Node, Msg, #{name := Name}) ->
     gen_statem:cast({Name, Node}, Msg).
 
-restart_election_timeout() ->
-    {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
 
-restart_heartbeat_timeout() ->
-    {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+state_timeout(follower) ->
+    {state_timeout, 150 + rand:uniform(150), new_election};
+
+state_timeout(candidate) ->
+    {state_timeout, 15 + rand:uniform(15), new_election};
+
+state_timeout(leader) ->
+    {state_timeout, 75, heartbeat}.
 
 peers(Cohort) ->
     Cohort -- [node()].


[couchdb] 02/05: introduce store abstraction (WIP)

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f2981b8ea032a7a565205dc97f68c85f9533a387
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat Jun 18 10:27:41 2022 +0100

    introduce store abstraction (WIP)
---
 Makefile                                        |  10 ++
 src/couch/src/couch_bt_engine.erl               |   2 -
 src/couch/src/couch_raft.erl                    | 154 +++++++++++++++---------
 src/couch/src/couch_raft_log.erl                |  52 --------
 src/couch/src/couch_raft_store.erl              |  35 ++++++
 src/couch/src/couch_raft_store_sha256.erl       |  80 ++++++++++++
 src/couch/test/{eunit => }/couch_raft_SUITE.erl |  40 ++++--
 7 files changed, 246 insertions(+), 127 deletions(-)

diff --git a/Makefile b/Makefile
index 82c2b335b..e7a389469 100644
--- a/Makefile
+++ b/Makefile
@@ -176,6 +176,16 @@ eunit: couch
             COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
         done
 
+.PHONY: ct
+ct: export BUILDDIR = $(shell pwd)
+ct: export ERL_AFLAGS = -config $(shell pwd)/rel/files/eunit.config
+ct: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell pwd)/share/server/main.js
+ct: export COUCHDB_TEST_ADMIN_PARTY_OVERRIDE=1
+ct: couch
+	@COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) setup_eunit 2> /dev/null
+	@for dir in $(subdirs); do \
+            COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r ct $(EUNIT_OPTS) apps=$$dir || exit 1; \
+        done
 
 .PHONY: exunit
 # target: exunit - Run ExUnit tests
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 8c1a2756d..d93071c1e 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -673,7 +673,6 @@ raft_discard(#st{} = St, UpTo) ->
         needs_commit = true
     }}.
 
-
 raft_last(#st{} = St) ->
     {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
     Last.
@@ -852,7 +851,6 @@ raft_tree_split({Index, Term, Value}) ->
 raft_tree_join(Index, {Term, Value}) ->
     {Index, Term, Value}.
 
-
 raft_tree_reduce(reduce, []) ->
     {{0, 0}, {0, 0}};
 raft_tree_reduce(reduce, Entries) ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index f398b4f2a..fda19cc22 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -26,8 +26,8 @@
 % public api
 
 -export([
-    start/2,
-    start_link/2,
+    start/3,
+    start_link/3,
     stop/1,
     call/2
 ]).
@@ -42,28 +42,23 @@
 
 %% public api
 
-start(Name, Cohort) ->
-    gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+start(Name, StoreModule, StoreState) ->
+    gen_statem:start({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
-start_link(Name, Cohort) ->
-    gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+start_link(Name, StoreModule, StoreState) ->
+    gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
-new(Name, Cohort) ->
+new(Name, StoreModule, StoreState) ->
+    #{cohort := Cohort} = StoreState,
     Peers = peers(Cohort),
-    #{
+    maps:merge(#{
         name => Name,
-        cohort => Cohort,
-        term => 0,
-        votedFor => undefined,
+        store_module => StoreModule,
         votesGranted => #{},
         nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
         matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
-        log => couch_raft_log:new(),
-        commitIndex => 0,
-        froms => #{},
-        lastApplied => 0,
-        machine => <<0>>
-    }.
+        froms => #{}
+    }, StoreState).
 
 stop(ServerRef) ->
     gen_statem:stop(ServerRef).
@@ -80,25 +75,26 @@ callback_mode() ->
 %% erlfmt-ignore
 handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
     couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
-    {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+    persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
 
 handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
-    {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+    persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
     couch_log:notice("~p became candidate in term ~B", [node(), Term]),
-    {keep_state, start_election(Data), restart_election_timeout()};
+    persist({keep_state, start_election(Data), restart_election_timeout()});
 
 handle_event(enter, _OldState, leader, Data) ->
-    #{log := Log, cohort := Cohort, term := Term} = Data,
+    #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
     couch_log:notice("~p became leader in term ~B", [node(), Term]),
     Peers = peers(Cohort),
+    {LastIndex, _} = StoreModule:last(Data),
     {keep_state, Data#{
-        nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+        nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
         matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
     }, restart_heartbeat_timeout()};
 
@@ -110,10 +106,11 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
         lastLogTerm := MLastLogTerm
     } = Msg,
     #{
-        log := Log,
+        store_module := StoreModule,
         votedFor := VotedFor
     } = Data,
-    LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+    {LastIndex, LastTerm} = StoreModule:last(Data),
+    LogOk = MLastLogTerm > LastTerm orelse (MLastLogTerm == LastTerm andalso MLastLogIndex >= LastIndex),
     Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
     couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
     Reply = #{
@@ -125,7 +122,7 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
     cast(MSource, Reply, Data),
     if
         Grant ->
-            {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+            persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
         true ->
             {keep_state_and_data, restart_election_timeout()}
     end;
@@ -158,9 +155,11 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
         commitIndex := MCommitIndex
     } = Msg,
     #{
-        log := Log
+        store_module := StoreModule
     } = Data,
-    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+    {LastIndex, _LastTerm} = StoreModule:last(Data),
+    {NthTerm, _} = StoreModule:lookup(MPrevLogIndex, Data),
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == NthTerm),
     if
         Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
             Reply = #{
@@ -194,11 +193,10 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                     {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
                 true ->
                     Index = MPrevLogIndex + 1,
-                    LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
                     if
-                        LastLogIndex >= Index ->
-                            NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
-                            FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+                        LastIndex >= Index ->
+                            {NthLogTerm, _} = StoreModule:lookup(Index, Data),
+                            {FirstEntryTerm, _} = hd(MEntries),
                             if
                                 NthLogTerm == FirstEntryTerm ->
                                     Reply = #{
@@ -213,11 +211,21 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                                     {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
                                 NthLogTerm /= FirstEntryTerm ->
                                     couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
-                                    {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                                    case StoreModule:truncate(LastIndex - 1, Data) of
+                                        {ok, NewData} ->
+                                            {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                        {error, Reason} ->
+                                            {stop, Reason}
+                                    end
                             end;
-                        LastLogIndex == MPrevLogIndex ->
+                        LastIndex == MPrevLogIndex ->
                             couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
-                            {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                            case StoreModule:append(MEntries, Data) of
+                                {ok, _EntryIndex, NewData} ->
+                                    {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                {error, Reason} ->
+                                    {stop, Reason}
+                            end
                     end
             end
     end;
@@ -245,10 +253,14 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
 
 handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
     #{value := Value} = Msg,
-    #{term := Term, log := Log, froms := Froms} = Data,
-    EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
-    Entry = {EntryIndex, Term, Value},
-    {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+    #{term := Term, store_module := StoreModule, froms := Froms} = Data,
+    Entry = {Term, Value},
+    case StoreModule:append([Entry], Data) of
+        {ok, EntryIndex, NewData} ->
+            {keep_state, NewData#{froms => Froms#{EntryIndex => From}}};
+        {error, Reason} ->
+            {stop_and_reply, Reason, {reply, From, {error, Reason}}}
+    end;
 
 handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
     {keep_state_and_data, {reply, From, {error, not_leader}}};
@@ -256,7 +268,7 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
 handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
     #{term := Term} = Data,
     couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
-    {next_state, candidate, start_election(Data), restart_election_timeout()};
+    persist({next_state, candidate, start_election(Data), restart_election_timeout()});
 
 handle_event(state_timeout, heartbeat, leader, Data) ->
     #{term := Term} = Data,
@@ -267,18 +279,22 @@ handle_event(state_timeout, heartbeat, leader, Data) ->
 handle_event(EventType, EventContent, State, Data) ->
     {stop, {unknown_event, EventType, EventContent, State, Data}}.
 
-
 send_append_entries(#{cohort := Cohort} = Data) ->
     send_append_entries(peers(Cohort), Data).
 
 send_append_entries([], _Data) ->
     ok;
 send_append_entries([Peer | Rest], Data) ->
-    #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+    #{term := Term, nextIndex := NextIndex, store_module := StoreModule, commitIndex := CommitIndex} = Data,
     PrevLogIndex = maps:get(Peer, NextIndex) - 1,
-    PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
-    LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
-    Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+    PrevLogTerm =
+        if
+            PrevLogIndex > 0 -> {NthTerm, _} = StoreModule:lookup(PrevLogIndex, Data), NthTerm;
+            true -> 0
+        end,
+    {LastIndex, _} = StoreModule:last(Data),
+    LastEntry = min(LastIndex, PrevLogIndex + 2),
+    Entries = StoreModule:range(PrevLogIndex + 1, ?BATCH_SIZE, Data),
     Msg = #{
         type => 'AppendEntriesRequest',
         term => Term,
@@ -292,9 +308,9 @@ send_append_entries([Peer | Rest], Data) ->
     send_append_entries(Rest, Data).
 
 advance_commit_index(Data) ->
-    #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
-    LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
-    LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+    #{matchIndex := MatchIndex, store_module := StoreModule, cohort := Cohort, term := Term} = Data,
+    {LastIndex, LastTerm} = StoreModule:last(Data),
+    LastIndexes = lists:sort([LastIndex | maps:values(MatchIndex)]),
     NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
     if
         LastTerm == Term ->
@@ -305,33 +321,37 @@ advance_commit_index(Data) ->
 
 update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
     Data;
-update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
-    #{log := Log, froms := Froms0, machine := Machine0} = Data,
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data0) when
+    LastApplied < CommitIndex
+->
+    #{store_module := StoreModule, froms := Froms0} = Data0,
     From = LastApplied + 1,
-    To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
-    Fun = fun(Index, {Froms, Machine}) ->
-        Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
-        Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+    {LastIndex, _} = StoreModule:last(Data0),
+    To = min(LastIndex, CommitIndex),
+    Fun = fun(Index, {Froms, Data}) ->
+        {_, Value} = StoreModule:lookup(Index, Data),
+        {Result, NewData} = StoreModule:apply(Value, Data),
         case maps:is_key(Index, Froms) of
             true ->
                 gen_statem:reply(maps:get(Index, Froms), Result),
-                {maps:remove(Index, Froms), Result};
+                {maps:remove(Index, Froms), NewData};
             false ->
-                {Froms, Result}
+                {Froms, NewData}
         end
     end,
-    {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
-    Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+    {Froms1, Data1} = lists:foldl(Fun, {Froms0, Data0}, lists:seq(From, To)),
+    Data1#{froms => Froms1, lastApplied => To}.
 
 start_election(Data) ->
-    #{term := Term, cohort := Cohort, log := Log} = Data,
+    #{term := Term, cohort := Cohort, store_module := StoreModule} = Data,
     ElectionTerm = Term + 1,
     couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    {LastLogIndex, LastLogTerm} = StoreModule:last(Data),
     RequestVote = #{
         type => 'RequestVoteRequest',
         term => ElectionTerm,
-        lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
-        lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+        lastLogIndex => LastLogIndex,
+        lastLogTerm => LastLogTerm,
         source => node()
     },
     lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
@@ -348,3 +368,17 @@ restart_heartbeat_timeout() ->
 
 peers(Cohort) ->
     Cohort -- [node()].
+
+persist({next_state, _NextState, NewData, _Actions} = HandleEventResult) ->
+    persist(NewData, HandleEventResult);
+persist({keep_state, NewData, _Actions} = HandleEventResult) ->
+    persist(NewData, HandleEventResult).
+
+persist(Data, HandleEventResult) ->
+    #{store_module := StoreModule} = Data,
+    case StoreModule:save_state(Data) of
+        ok ->
+            HandleEventResult;
+        {error, Reason} ->
+            {stop, Reason}
+    end.
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
deleted file mode 100644
index 987212457..000000000
--- a/src/couch/src/couch_raft_log.erl
+++ /dev/null
@@ -1,52 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-
--module(couch_raft_log).
-
--export([
-    new/0,
-    append/2,
-    sublist/3,
-    nth/2,
-    last/1,
-    index/1,
-    term/1,
-    value/1
-]).
-
-new() ->
-    [].
-
-append(Log, Items) ->
-    lists:append(Log, Items).
-
-sublist(Log, Start, Len) ->
-    lists:sublist(Log, Start, Len).
-
-nth(N, Log) ->
-    lists:nth(N, Log).
-
-last([]) ->
-    {0, 0, undefined};
-last(Log) ->
-    lists:last(Log).
-
-index(Entry) ->
-    element(1, Entry).
-
-term(Entry) ->
-    element(2, Entry).
-
-value(Entry) ->
-    element(3, Entry).
diff --git a/src/couch/src/couch_raft_store.erl b/src/couch/src/couch_raft_store.erl
new file mode 100644
index 000000000..81ebe684e
--- /dev/null
+++ b/src/couch/src/couch_raft_store.erl
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_store).
+
+-callback init(Args :: term()) -> {ok, State :: #{}} | {stop, Reason :: term()}.
+
+% raft state callbacks
+
+-callback save_state(State :: #{}) -> ok | {error, Reason :: term()}.
+
+%% log callbacks
+-type log_entry() :: {Term :: non_neg_integer(), Value :: term()}.
+-callback last(State :: #{}) -> {Index :: non_neg_integer(), Term :: non_neg_integer()}.
+-callback lookup(N :: non_neg_integer(), State :: #{}) -> log_entry() | not_found.
+-callback range(Start :: non_neg_integer(), Len :: non_neg_integer(), State :: #{}) -> [log_entry() | not_found].
+-callback append(Entries :: [log_entry()], State :: #{}) ->
+    {ok, Index :: non_neg_integer(), NewState :: #{}} | {error, Reason :: term()}.
+-callback truncate(To :: non_neg_integer(), State :: #{}) -> {ok, NewState :: #{}} | {error, Reason :: term()}.
+-callback discard(UpTo :: non_neg_integer(), State :: #{}) ->
+    {ok, NewState :: #{}} | {error, Reason :: term()}.
+
+%% state machine callbacks
+-callback apply(Args :: term(), State :: #{}) -> {Result :: term(), NewState :: #{}}.
diff --git a/src/couch/src/couch_raft_store_sha256.erl b/src/couch/src/couch_raft_store_sha256.erl
new file mode 100644
index 000000000..e313da3e2
--- /dev/null
+++ b/src/couch/src/couch_raft_store_sha256.erl
@@ -0,0 +1,80 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% a non-persistent implementation of the raft_log_store behaviour for testing purposes.
+
+-module(couch_raft_store_sha256).
+-behaviour(couch_raft_store).
+
+-export([
+    init/1,
+    save_state/1,
+    %% log
+    last/1,
+    lookup/2,
+    range/3,
+    append/2,
+    truncate/2,
+    discard/2,
+    %% state machine
+    apply/2
+]).
+
+init(Cohort) ->
+    {ok, #{
+        cohort => Cohort,
+        commitIndex => 0,
+        lastApplied => 0,
+        log => [],
+        machine => <<0>>,
+        term => 0,
+        votedFor => undefined
+    }}.
+
+% raft state callbacks
+
+save_state(#{} = State) ->
+    _WouldPersist = maps:with([cohort, term, votedFor, lastApplied, machine], State),
+    ok.
+
+%% log callbacks
+last(#{log := []}) ->
+    {0, 0};
+last(#{log := Log}) ->
+    {LastTerm, _} = lists:last(Log),
+    {length(Log), LastTerm}.
+
+lookup(0, #{}) ->
+    {0, 0};
+lookup(N, #{log := Log}) when N > 0 ->
+    lists:nth(N, Log).
+
+range(Start, Len, #{log := Log}) when Start > 0, Len > 0 ->
+    lists:sublist(Log, Start, Len).
+
+append(Entries, #{log := Log} = State) when is_list(Entries) ->
+    NewLog = lists:append(Log, Entries),
+    {ok, length(NewLog), State#{log => NewLog}}.
+
+truncate(To, #{log := Log} = State) ->
+    {ok, State#{log => lists:sublist(Log, To)}}.
+
+discard(_UpTo, #{}) ->
+    {error, not_implemented}.
+
+%% state machine callbacks
+
+apply(Bin, #{machine := Machine0} = State) when is_binary(Bin), is_binary(Machine0) ->
+    Machine1 = crypto:hash(sha256, <<Machine0/binary, Bin/binary>>),
+    {Machine1, State#{machine => Machine1}}.
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/couch_raft_SUITE.erl
similarity index 57%
rename from src/couch/test/eunit/couch_raft_SUITE.erl
rename to src/couch/test/couch_raft_SUITE.erl
index 1c3f8ebc2..42e1f4ab3 100644
--- a/src/couch/test/eunit/couch_raft_SUITE.erl
+++ b/src/couch/test/couch_raft_SUITE.erl
@@ -27,41 +27,55 @@ all() ->
 
 three_nodes(Config) when is_list(Config) ->
     N = 3,
-    Args = ["-pa", filename:dirname(code:which(craft))],
+    Args = [
+        "-pa", filename:dirname(code:which(couch_raft)),
+        "-pa", filename:dirname(code:which(couch_log)),
+        "-pa", filename:dirname(code:which(couch_stats))
+    ],
     Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
-    Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+    Cohort = [
+        receive
+            {tag, {started, Node, Peer}} -> Node
+        end
+     || {ok, Peer} <- Peers
+    ],
 
-    Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+    {ok, InitialState} = couch_raft_store_sha256:init(Cohort),
+    Crafts = [erpc:call(Node, couch_raft, start, [foo, couch_raft_store_sha256, InitialState]) || Node <- Cohort],
 
     % wait for leader election
     timer:sleep(500),
 
     % verify only one leader elected
-    [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
-    [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+    [{leader, FirstLeader}] = lists:filter(
+        fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]
+    ),
 
     % make a series of calls
     Hash1 = crypto:hash(sha256, <<0, 1>>),
-    ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+    ?assertEqual(Hash1, couch_raft:call(FirstLeader, <<1>>)),
 
     Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
-    ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+    ?assertEqual(Hash2, couch_raft:call(FirstLeader, <<2>>)),
 
     Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
-    ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+    ?assertEqual(Hash3, couch_raft:call(FirstLeader, <<3>>)),
 
     % force a re-election
-    craft3:stop(FirstLeader),
+    couch_raft:stop(FirstLeader),
     timer:sleep(500),
 
     % verify new leader elected
-    [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
-        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+    [{leader, SecondLeader}] = lists:filter(
+        fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]
+    ),
     ?assertNotEqual(FirstLeader, SecondLeader),
 
     % make another call
     Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
-    ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+    ?assertEqual(Hash4, couch_raft:call(SecondLeader, <<4>>)),
 
-    [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+    [couch_raft:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
     [peer:stop(Peer) || {ok, Peer} <- Peers].


[couchdb] 03/05: don't track matchIndex/nextIndex in non-leaders, pointless

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7e5fc643913ccd7616fc1029473193a343d182e2
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Jul 4 23:23:36 2022 +0100

    don't track matchIndex/nextIndex in non-leaders, pointless
---
 src/couch/src/couch_raft.erl | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index fda19cc22..c580346fc 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -49,14 +49,10 @@ start_link(Name, StoreModule, StoreState) ->
     gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
 new(Name, StoreModule, StoreState) ->
-    #{cohort := Cohort} = StoreState,
-    Peers = peers(Cohort),
     maps:merge(#{
         name => Name,
         store_module => StoreModule,
         votesGranted => #{},
-        nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
-        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
         froms => #{}
     }, StoreState).
 
@@ -81,7 +77,8 @@ handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
-    persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
+    persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, froms => #{}}),
+        [restart_election_timeout() | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
@@ -234,7 +231,7 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State,
     couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
     keep_state_and_data;
 
-handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, leader, #{term := Term} = Data) ->
     #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
     #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
     couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
@@ -251,6 +248,9 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
             }}
     end;
 
+handle_event(cast, #{type := 'AppendEntriesResponse'}, _State, _Data) ->
+    keep_state_and_data;
+
 handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
     #{value := Value} = Msg,
     #{term := Term, store_module := StoreModule, froms := Froms} = Data,


[couchdb] 04/05: clear votesGranted when unused for readability

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0b440b86f5d570ebdc7e7fa0677ae85481143044
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 17 21:53:01 2022 +0100

    clear votesGranted when unused for readability
---
 src/couch/src/couch_raft.erl | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index c580346fc..06025784e 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -52,7 +52,7 @@ new(Name, StoreModule, StoreState) ->
     maps:merge(#{
         name => Name,
         store_module => StoreModule,
-        votesGranted => #{},
+        votesGranted => undefined,
         froms => #{}
     }, StoreState).
 
@@ -71,13 +71,13 @@ callback_mode() ->
 %% erlfmt-ignore
 handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
     couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
-    persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
+    persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined, votesGranted => undefined}, {next_event, cast, Msg}});
 
 handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
-    persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, froms => #{}}),
+    persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
         [restart_election_timeout() | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->