You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2022/09/01 16:05:55 UTC

[couchdb] branch raft_storemodule updated (c5569e709 -> 1268bb0bb)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git


 discard c5569e709 HACK: demonstrate the shards running elections in a very hackish way
 discard d6fa02d4a include raft name and node in log lines
 discard 88d7f23a8 Implement _bulk_get support for the replicator
 discard eea0293f5 Allow and evaluate nested json claim roles in JWT token
 discard ab5e3daad Fix variable already bound compiler warnings
 discard 4d24d8989 Refactor hash algorithms test
 discard e762b10a2 Upgrade hash algorithm for cookie auth (#4140)
 discard 37e177fd7 fix missing "=" for admin party in #4153
 discard 387f32ae5 config section for require_valid_user is only [chttpd]
 discard af42d364f Address race in cpse_incref_decref test
 discard 97a747496 moved the name property to the correct spot.
 discard a3c09e12a update variable name and readme
 discard 87fe58999 update devcontainer
 discard 811bcf073 Update couch_replicator_use_checkpoints_tests
 discard 362e82d9a Update couch_replicator_small_max_request_size_target
 discard a8d7da8b1 Update couch_replicator_selector_tests
 discard 9a03f29ce Update couch_replicator_retain_stats_between_job_runs
 discard bc8ede12a Update couch_replicator_rate_limiter_tests
 discard f61cabf3c Update couch_replicator_proxy_tests
 discard 690e9b52d Update couch_replicator_missing_stubs_tests
 discard 24547d5d2 Update couch_replicator_many_leaves_tests
 discard 62c761613 Update couch_replicator_large_atts_tests
 discard 7fc302f05 Update couch_replicator_id_too_long_tests
 discard e7de68e58 Update couch_replicator_httpc_pool_tests
 discard fda897a6f Update couch_replicator_filtered_tests
 discard 410ed0bc3 Update couch_replicator_error_reporting_tests
 discard cd04b1454 Update couch_replicator_create_target_with_options_tests
 discard 63c9d89ca Update couch_replicator_connection_tests
 discard 250285e42 Update couch_replicator_compact_tests
 discard 2bc960b73 Update couch_replicator_attachments_too_large to use fabric
 discard 2017162ff Add some utility functions to couch_replicator_test_helper
 discard 52eafa818 use global so raft member names don't have to be atoms (we'll run out)
 discard 09531cfe8 don't perform MPrevLogIndex lookup before MPrevLogIndex =< LastIndex check
 discard 725b2cdc1 separate follower and candidate timeouts
 discard 0b440b86f clear votesGranted when unused for readability
 discard 7e5fc6439 don't track matchIndex/nextIndex in non-leaders, pointless
 discard f2981b8ea introduce store abstraction (WIP)
 discard 5be100df2 Integrate raft algorithm (WIP)
     add 264ad11f3 Add some utility functions to couch_replicator_test_helper
     add 1ec76df18 Update couch_replicator_attachments_too_large to use fabric
     add e1a947531 Update couch_replicator_compact_tests
     add 620bdea36 Update couch_replicator_connection_tests
     add 31185647f Update couch_replicator_create_target_with_options_tests
     add 381fe30d8 Update couch_replicator_error_reporting_tests
     add cad96318f Update couch_replicator_filtered_tests
     add 75e73da04 Update couch_replicator_httpc_pool_tests
     add 065b212e5 Update couch_replicator_id_too_long_tests
     add 9a6875f6b Update couch_replicator_large_atts_tests
     add ca46fa61c Update couch_replicator_many_leaves_tests
     add 033b8d5de Update couch_replicator_missing_stubs_tests
     add 9243298ef Update couch_replicator_proxy_tests
     add d831ec70e Update couch_replicator_rate_limiter_tests
     add 1e69eaac9 Update couch_replicator_retain_stats_between_job_runs
     add 8e6158972 Update couch_replicator_selector_tests
     add 609e7cc79 Update couch_replicator_small_max_request_size_target
     add e6db35b6b Update couch_replicator_use_checkpoints_tests
     add b9afb4590 update devcontainer
     add 7fabe4bfb update variable name and readme
     add 7a53ffcce moved the name property to the correct spot.
     add baef83f6b Address race in cpse_incref_decref test
     add 6f5a021f7 config section for require_valid_user is only [chttpd]
     add 133d6bb40 fix missing "=" for admin party in #4153
     add bc3242bc8 Upgrade hash algorithm for cookie auth (#4140)
     add ea382cf28 Refactor hash algorithms test
     add eee3c4fc2 Fix variable already bound compiler warnings
     add 8af61fbb0 Allow and evaluate nested json claim roles in JWT token
     add d4c7273e7 Merge pull request #4041 from apache/draft_allow_nested_json_claim_roles
     add be93983b9 Implement _bulk_get support for the replicator
     new 9ce27998e Integrate raft algorithm (WIP)
     new db0ab57f7 introduce store abstraction (WIP)
     new eeead2a86 don't track matchIndex/nextIndex in non-leaders, pointless
     new 9e0e337fc clear votesGranted when unused for readability
     new ad6a94f25 separate follower and candidate timeouts
     new 82674116f don't perform MPrevLogIndex lookup before MPrevLogIndex =< LastIndex check
     new 76c0f0dae use global so raft member names don't have to be atoms (we'll run out)
     new f9dd5980a include raft name and node in log lines
     new 1268bb0bb HACK: demonstrate the shards running elections in a very hackish way

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c5569e709)
            \
             N -- N -- N   refs/heads/raft_storemodule (1268bb0bb)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[couchdb] 05/09: separate follower and candidate timeouts

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ad6a94f25ec11014af766a8c4e59345d2e4d33bb
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 17 21:57:53 2022 +0100

    separate follower and candidate timeouts
    
    From;
    
    ARC: Analysis of Raft Consensus - 4.2
    
    "As the authors use the same timer range for candidates and followers,
    in Figure 4.1 we are waiting a minimum of 150ms (and up to twice that)
    before restarting an election, despite the fact that, on average, a
    node receives all of its responses within 15ms"
    
    We separate the timeouts and set the candidate timeout smaller than
    the follower timeout. In a contested election (where multiple
    candidates each gain a minority of votes) we should elect a leader
    faster than otherwise.
---
 src/couch/src/couch_raft.erl | 41 +++++++++++++++++++++--------------------
 1 file changed, 21 insertions(+), 20 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index 06025784e..98fb6f926 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -15,9 +15,6 @@
 -module(couch_raft).
 -behaviour(gen_statem).
 
--define(ELECTION_DELAY, 150).
--define(ELECTION_SPLAY, 150).
--define(LEADER_HEARTBEAT, 75).
 -define(CLIENT_TIMEOUT, 5_000).
 
 % maximum number of entries to send in one go.
@@ -78,12 +75,12 @@ handle_event(enter, _OldState, follower, Data) ->
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
     persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
-        [restart_election_timeout() | Replies]});
+        [state_timeout(follower) | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
     couch_log:notice("~p became candidate in term ~B", [node(), Term]),
-    persist({keep_state, start_election(Data), restart_election_timeout()});
+    persist({keep_state, start_election(Data), state_timeout(candidate)});
 
 handle_event(enter, _OldState, leader, Data) ->
     #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
@@ -93,9 +90,9 @@ handle_event(enter, _OldState, leader, Data) ->
     {keep_state, Data#{
         nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
         matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
-    }, restart_heartbeat_timeout()};
+    }, state_timeout(leader)};
 
-handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
   when Term =< CurrentTerm ->
     #{
         source := MSource,
@@ -119,9 +116,9 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
     cast(MSource, Reply, Data),
     if
         Grant ->
-            persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
+            persist({keep_state, Data#{votedFor => MSource}, state_timeout(State)});
         true ->
-            {keep_state_and_data, restart_election_timeout()}
+            {keep_state_and_data, state_timeout(State)}
     end;
 
 handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
@@ -171,7 +168,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                 State == leader ->
                     keep_state_and_data;
                 true ->
-                    {keep_state_and_data, restart_election_timeout()}
+                    {keep_state_and_data, state_timeout(State)}
             end;
         Term == CurrentTerm andalso State == candidate ->
             {next_state, follower, Data, {next_event, cast, Msg}};
@@ -187,7 +184,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                     },
                     couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
                     cast(MSource, Reply, Data),
-                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
                 true ->
                     Index = MPrevLogIndex + 1,
                     if
@@ -205,12 +202,12 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                                     },
                                     couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
                                     cast(MSource, Reply, Data),
-                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
                                 NthLogTerm /= FirstEntryTerm ->
                                     couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
                                     case StoreModule:truncate(LastIndex - 1, Data) of
                                         {ok, NewData} ->
-                                            {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                            {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
                                         {error, Reason} ->
                                             {stop, Reason}
                                     end
@@ -219,7 +216,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                             couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
                             case StoreModule:append(MEntries, Data) of
                                 {ok, _EntryIndex, NewData} ->
-                                    {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                    {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
                                 {error, Reason} ->
                                     {stop, Reason}
                             end
@@ -268,13 +265,13 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
 handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
     #{term := Term} = Data,
     couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
-    persist({next_state, candidate, start_election(Data), restart_election_timeout()});
+    persist({next_state, candidate, start_election(Data), state_timeout(State)});
 
 handle_event(state_timeout, heartbeat, leader, Data) ->
     #{term := Term} = Data,
     couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
     ok = send_append_entries(Data),
-    {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+    {keep_state, advance_commit_index(Data), state_timeout(leader)};
 
 handle_event(EventType, EventContent, State, Data) ->
     {stop, {unknown_event, EventType, EventContent, State, Data}}.
@@ -360,11 +357,15 @@ start_election(Data) ->
 cast(Node, Msg, #{name := Name}) ->
     gen_statem:cast({Name, Node}, Msg).
 
-restart_election_timeout() ->
-    {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
 
-restart_heartbeat_timeout() ->
-    {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+state_timeout(follower) ->
+    {state_timeout, 150 + rand:uniform(150), new_election};
+
+state_timeout(candidate) ->
+    {state_timeout, 15 + rand:uniform(15), new_election};
+
+state_timeout(leader) ->
+    {state_timeout, 75, heartbeat}.
 
 peers(Cohort) ->
     Cohort -- [node()].


[couchdb] 08/09: include raft name and node in log lines

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f9dd5980a7364ebfe0672d76e09843b86e82ccb7
Author: Robert Newson <rn...@apache.org>
AuthorDate: Thu Sep 1 16:55:38 2022 +0100

    include raft name and node in log lines
---
 src/couch/src/couch_raft.erl | 41 ++++++++++++++++++++++-------------------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index f20207a21..b847a1b50 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -67,24 +67,24 @@ callback_mode() ->
 
 %% erlfmt-ignore
 handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
-    couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+    couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [id(Data), FutureTerm]),
     persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined, votesGranted => undefined}, {next_event, cast, Msg}});
 
 handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
-    couch_log:notice("~p became follower in term ~B", [node(), Term]),
+    couch_log:notice("~p became follower in term ~B", [id(Data), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
     persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
         [state_timeout(follower) | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
-    couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+    couch_log:notice("~p became candidate in term ~B", [id(Data), Term]),
     persist({keep_state, start_election(Data), state_timeout(candidate)});
 
 handle_event(enter, _OldState, leader, Data) ->
     #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
-    couch_log:notice("~p became leader in term ~B", [node(), Term]),
+    couch_log:notice("~p became leader in term ~B", [id(Data), Term]),
     Peers = peers(Cohort),
     {LastIndex, _} = StoreModule:last(Data),
     {keep_state, Data#{
@@ -106,7 +106,7 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, State, #
     {LastIndex, LastTerm} = StoreModule:last(Data),
     LogOk = MLastLogTerm > LastTerm orelse (MLastLogTerm == LastTerm andalso MLastLogIndex >= LastIndex),
     Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
-    couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+    couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [id(Data), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
     Reply = #{
         type => 'RequestVoteResponse',
         term => CurrentTerm,
@@ -121,18 +121,18 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, State, #
             {keep_state_and_data, state_timeout(State)}
     end;
 
-handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
-    couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm} = Data) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [id(Data), PastTerm]),
     keep_state_and_data;
 
 handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
     #{source := MSource, voteGranted := MVoteGranted} = Msg,
     #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
     VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
-    couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+    couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [id(Data), MSource, Term, VotesGranted1]),
     if
         length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
-            couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+            couch_log:notice("~p has enough votes to be leader in term ~B", [id(Data), Term]),
             {next_state, leader, Data#{votesGranted => VotesGranted1}};
         true ->
             {keep_state, Data#{votesGranted => VotesGranted1}}
@@ -181,7 +181,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                         matchIndex => MPrevLogIndex,
                         source => node()
                     },
-                    couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+                    couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [id(Data), MPrevLogIndex]),
                     cast(MSource, Reply, Data),
                     {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
                 true ->
@@ -199,11 +199,11 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                                         matchIndex => MPrevLogIndex + length(MEntries),
                                         source => node()
                                     },
-                                    couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+                                    couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [id(Data), MEntries, MPrevLogIndex + length(MEntries)]),
                                     cast(MSource, Reply, Data),
                                     {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
                                 NthLogTerm /= FirstEntryTerm ->
-                                    couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+                                    couch_log:notice("~p received conflicting entry:~p, deleting it", [id(Data), MEntries]),
                                     case StoreModule:truncate(LastIndex - 1, Data) of
                                         {ok, NewData} ->
                                             {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
@@ -212,7 +212,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                                     end
                             end;
                         LastIndex == MPrevLogIndex ->
-                            couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+                            couch_log:notice("~p received new entries:~p, appending it to log", [id(Data), MEntries]),
                             case StoreModule:append(MEntries, Data) of
                                 {ok, _EntryIndex, NewData} ->
                                     {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
@@ -223,14 +223,14 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
             end
     end;
 
-handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
-    couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm} = Data) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [id(Data), PastTerm]),
     keep_state_and_data;
 
 handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, leader, #{term := Term} = Data) ->
     #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
     #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
-    couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+    couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [id(Data), MSource, Term, MSuccess]),
     SourceNextIndex = maps:get(MSource, NextIndex),
     if
         MSuccess ->
@@ -263,12 +263,12 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
 
 handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
     #{term := Term} = Data,
-    couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+    couch_log:notice("~p election timeout in state ~p, term ~B", [id(Data), State, Term]),
     persist({next_state, candidate, start_election(Data), state_timeout(State)});
 
 handle_event(state_timeout, heartbeat, leader, Data) ->
     #{term := Term} = Data,
-    couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+    couch_log:debug("~p leader sending a heartbeat in term ~B", [id(Data), Term]),
     ok = send_append_entries(Data),
     {keep_state, advance_commit_index(Data), state_timeout(leader)};
 
@@ -341,7 +341,7 @@ update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} =
 start_election(Data) ->
     #{term := Term, cohort := Cohort, store_module := StoreModule} = Data,
     ElectionTerm = Term + 1,
-    couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    couch_log:notice("~p starting election in term ~B", [id(Data), ElectionTerm]),
     {LastLogIndex, LastLogTerm} = StoreModule:last(Data),
     RequestVote = #{
         type => 'RequestVoteRequest',
@@ -393,3 +393,6 @@ persist(Data, HandleEventResult) ->
         {error, Reason} ->
             {stop, Reason}
     end.
+
+id(#{name := Name}) ->
+    [Name, node()].


[couchdb] 06/09: don't perform MPrevLogIndex lookup before MPrevLogIndex =< LastIndex check

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 82674116fa7da011c07cd65def214108b60d5997
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Aug 30 13:24:24 2022 +0100

    don't perform MPrevLogIndex lookup before MPrevLogIndex =< LastIndex check
    
    This caused a slow follower to crash as MPrevLogIndex would not exist in
    its log
---
 src/couch/src/couch_raft.erl | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index 98fb6f926..b862d055b 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -152,8 +152,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
         store_module := StoreModule
     } = Data,
     {LastIndex, _LastTerm} = StoreModule:last(Data),
-    {NthTerm, _} = StoreModule:lookup(MPrevLogIndex, Data),
-    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == NthTerm),
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == nthterm(MPrevLogIndex, Data)),
     if
         Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
             Reply = #{
@@ -370,6 +369,18 @@ state_timeout(leader) ->
 peers(Cohort) ->
     Cohort -- [node()].
 
+
+nthterm(N, Data) ->
+    #{
+        store_module := StoreModule
+    } = Data,
+    case StoreModule:lookup(N, Data) of
+        not_found ->
+            not_found;
+        {Term, _Value} ->
+            Term
+        end.
+
 persist({next_state, _NextState, NewData, _Actions} = HandleEventResult) ->
     persist(NewData, HandleEventResult);
 persist({keep_state, NewData, _Actions} = HandleEventResult) ->


[couchdb] 02/09: introduce store abstraction (WIP)

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit db0ab57f7385a5d5fd133d991108c530626bb058
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat Jun 18 10:27:41 2022 +0100

    introduce store abstraction (WIP)
---
 Makefile                                        |  10 ++
 src/couch/src/couch_bt_engine.erl               |   2 -
 src/couch/src/couch_raft.erl                    | 154 +++++++++++++++---------
 src/couch/src/couch_raft_log.erl                |  52 --------
 src/couch/src/couch_raft_store.erl              |  35 ++++++
 src/couch/src/couch_raft_store_sha256.erl       |  80 ++++++++++++
 src/couch/test/{eunit => }/couch_raft_SUITE.erl |  40 ++++--
 7 files changed, 246 insertions(+), 127 deletions(-)

diff --git a/Makefile b/Makefile
index 82c2b335b..e7a389469 100644
--- a/Makefile
+++ b/Makefile
@@ -176,6 +176,16 @@ eunit: couch
             COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
         done
 
+.PHONY: ct
+ct: export BUILDDIR = $(shell pwd)
+ct: export ERL_AFLAGS = -config $(shell pwd)/rel/files/eunit.config
+ct: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell pwd)/share/server/main.js
+ct: export COUCHDB_TEST_ADMIN_PARTY_OVERRIDE=1
+ct: couch
+	@COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) setup_eunit 2> /dev/null
+	@for dir in $(subdirs); do \
+            COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r ct $(EUNIT_OPTS) apps=$$dir || exit 1; \
+        done
 
 .PHONY: exunit
 # target: exunit - Run ExUnit tests
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 8c1a2756d..d93071c1e 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -673,7 +673,6 @@ raft_discard(#st{} = St, UpTo) ->
         needs_commit = true
     }}.
 
-
 raft_last(#st{} = St) ->
     {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
     Last.
@@ -852,7 +851,6 @@ raft_tree_split({Index, Term, Value}) ->
 raft_tree_join(Index, {Term, Value}) ->
     {Index, Term, Value}.
 
-
 raft_tree_reduce(reduce, []) ->
     {{0, 0}, {0, 0}};
 raft_tree_reduce(reduce, Entries) ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index f398b4f2a..fda19cc22 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -26,8 +26,8 @@
 % public api
 
 -export([
-    start/2,
-    start_link/2,
+    start/3,
+    start_link/3,
     stop/1,
     call/2
 ]).
@@ -42,28 +42,23 @@
 
 %% public api
 
-start(Name, Cohort) ->
-    gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+start(Name, StoreModule, StoreState) ->
+    gen_statem:start({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
-start_link(Name, Cohort) ->
-    gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+start_link(Name, StoreModule, StoreState) ->
+    gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
-new(Name, Cohort) ->
+new(Name, StoreModule, StoreState) ->
+    #{cohort := Cohort} = StoreState,
     Peers = peers(Cohort),
-    #{
+    maps:merge(#{
         name => Name,
-        cohort => Cohort,
-        term => 0,
-        votedFor => undefined,
+        store_module => StoreModule,
         votesGranted => #{},
         nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
         matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
-        log => couch_raft_log:new(),
-        commitIndex => 0,
-        froms => #{},
-        lastApplied => 0,
-        machine => <<0>>
-    }.
+        froms => #{}
+    }, StoreState).
 
 stop(ServerRef) ->
     gen_statem:stop(ServerRef).
@@ -80,25 +75,26 @@ callback_mode() ->
 %% erlfmt-ignore
 handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
     couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
-    {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+    persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
 
 handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
-    {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+    persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
     couch_log:notice("~p became candidate in term ~B", [node(), Term]),
-    {keep_state, start_election(Data), restart_election_timeout()};
+    persist({keep_state, start_election(Data), restart_election_timeout()});
 
 handle_event(enter, _OldState, leader, Data) ->
-    #{log := Log, cohort := Cohort, term := Term} = Data,
+    #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
     couch_log:notice("~p became leader in term ~B", [node(), Term]),
     Peers = peers(Cohort),
+    {LastIndex, _} = StoreModule:last(Data),
     {keep_state, Data#{
-        nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+        nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
         matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
     }, restart_heartbeat_timeout()};
 
@@ -110,10 +106,11 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
         lastLogTerm := MLastLogTerm
     } = Msg,
     #{
-        log := Log,
+        store_module := StoreModule,
         votedFor := VotedFor
     } = Data,
-    LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+    {LastIndex, LastTerm} = StoreModule:last(Data),
+    LogOk = MLastLogTerm > LastTerm orelse (MLastLogTerm == LastTerm andalso MLastLogIndex >= LastIndex),
     Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
     couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
     Reply = #{
@@ -125,7 +122,7 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
     cast(MSource, Reply, Data),
     if
         Grant ->
-            {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+            persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
         true ->
             {keep_state_and_data, restart_election_timeout()}
     end;
@@ -158,9 +155,11 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
         commitIndex := MCommitIndex
     } = Msg,
     #{
-        log := Log
+        store_module := StoreModule
     } = Data,
-    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+    {LastIndex, _LastTerm} = StoreModule:last(Data),
+    {NthTerm, _} = StoreModule:lookup(MPrevLogIndex, Data),
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == NthTerm),
     if
         Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
             Reply = #{
@@ -194,11 +193,10 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                     {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
                 true ->
                     Index = MPrevLogIndex + 1,
-                    LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
                     if
-                        LastLogIndex >= Index ->
-                            NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
-                            FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+                        LastIndex >= Index ->
+                            {NthLogTerm, _} = StoreModule:lookup(Index, Data),
+                            {FirstEntryTerm, _} = hd(MEntries),
                             if
                                 NthLogTerm == FirstEntryTerm ->
                                     Reply = #{
@@ -213,11 +211,21 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
                                     {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
                                 NthLogTerm /= FirstEntryTerm ->
                                     couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
-                                    {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                                    case StoreModule:truncate(LastIndex - 1, Data) of
+                                        {ok, NewData} ->
+                                            {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                        {error, Reason} ->
+                                            {stop, Reason}
+                                    end
                             end;
-                        LastLogIndex == MPrevLogIndex ->
+                        LastIndex == MPrevLogIndex ->
                             couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
-                            {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                            case StoreModule:append(MEntries, Data) of
+                                {ok, _EntryIndex, NewData} ->
+                                    {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+                                {error, Reason} ->
+                                    {stop, Reason}
+                            end
                     end
             end
     end;
@@ -245,10 +253,14 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
 
 handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
     #{value := Value} = Msg,
-    #{term := Term, log := Log, froms := Froms} = Data,
-    EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
-    Entry = {EntryIndex, Term, Value},
-    {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+    #{term := Term, store_module := StoreModule, froms := Froms} = Data,
+    Entry = {Term, Value},
+    case StoreModule:append([Entry], Data) of
+        {ok, EntryIndex, NewData} ->
+            {keep_state, NewData#{froms => Froms#{EntryIndex => From}}};
+        {error, Reason} ->
+            {stop_and_reply, Reason, {reply, From, {error, Reason}}}
+    end;
 
 handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
     {keep_state_and_data, {reply, From, {error, not_leader}}};
@@ -256,7 +268,7 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
 handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
     #{term := Term} = Data,
     couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
-    {next_state, candidate, start_election(Data), restart_election_timeout()};
+    persist({next_state, candidate, start_election(Data), restart_election_timeout()});
 
 handle_event(state_timeout, heartbeat, leader, Data) ->
     #{term := Term} = Data,
@@ -267,18 +279,22 @@ handle_event(state_timeout, heartbeat, leader, Data) ->
 handle_event(EventType, EventContent, State, Data) ->
     {stop, {unknown_event, EventType, EventContent, State, Data}}.
 
-
 send_append_entries(#{cohort := Cohort} = Data) ->
     send_append_entries(peers(Cohort), Data).
 
 send_append_entries([], _Data) ->
     ok;
 send_append_entries([Peer | Rest], Data) ->
-    #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+    #{term := Term, nextIndex := NextIndex, store_module := StoreModule, commitIndex := CommitIndex} = Data,
     PrevLogIndex = maps:get(Peer, NextIndex) - 1,
-    PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
-    LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
-    Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+    PrevLogTerm =
+        if
+            PrevLogIndex > 0 -> {NthTerm, _} = StoreModule:lookup(PrevLogIndex, Data), NthTerm;
+            true -> 0
+        end,
+    {LastIndex, _} = StoreModule:last(Data),
+    LastEntry = min(LastIndex, PrevLogIndex + 2),
+    Entries = StoreModule:range(PrevLogIndex + 1, ?BATCH_SIZE, Data),
     Msg = #{
         type => 'AppendEntriesRequest',
         term => Term,
@@ -292,9 +308,9 @@ send_append_entries([Peer | Rest], Data) ->
     send_append_entries(Rest, Data).
 
 advance_commit_index(Data) ->
-    #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
-    LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
-    LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+    #{matchIndex := MatchIndex, store_module := StoreModule, cohort := Cohort, term := Term} = Data,
+    {LastIndex, LastTerm} = StoreModule:last(Data),
+    LastIndexes = lists:sort([LastIndex | maps:values(MatchIndex)]),
     NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
     if
         LastTerm == Term ->
@@ -305,33 +321,37 @@ advance_commit_index(Data) ->
 
 update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
     Data;
-update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
-    #{log := Log, froms := Froms0, machine := Machine0} = Data,
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data0) when
+    LastApplied < CommitIndex
+->
+    #{store_module := StoreModule, froms := Froms0} = Data0,
     From = LastApplied + 1,
-    To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
-    Fun = fun(Index, {Froms, Machine}) ->
-        Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
-        Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+    {LastIndex, _} = StoreModule:last(Data0),
+    To = min(LastIndex, CommitIndex),
+    Fun = fun(Index, {Froms, Data}) ->
+        {_, Value} = StoreModule:lookup(Index, Data),
+        {Result, NewData} = StoreModule:apply(Value, Data),
         case maps:is_key(Index, Froms) of
             true ->
                 gen_statem:reply(maps:get(Index, Froms), Result),
-                {maps:remove(Index, Froms), Result};
+                {maps:remove(Index, Froms), NewData};
             false ->
-                {Froms, Result}
+                {Froms, NewData}
         end
     end,
-    {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
-    Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+    {Froms1, Data1} = lists:foldl(Fun, {Froms0, Data0}, lists:seq(From, To)),
+    Data1#{froms => Froms1, lastApplied => To}.
 
 start_election(Data) ->
-    #{term := Term, cohort := Cohort, log := Log} = Data,
+    #{term := Term, cohort := Cohort, store_module := StoreModule} = Data,
     ElectionTerm = Term + 1,
     couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    {LastLogIndex, LastLogTerm} = StoreModule:last(Data),
     RequestVote = #{
         type => 'RequestVoteRequest',
         term => ElectionTerm,
-        lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
-        lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+        lastLogIndex => LastLogIndex,
+        lastLogTerm => LastLogTerm,
         source => node()
     },
     lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
@@ -348,3 +368,17 @@ restart_heartbeat_timeout() ->
 
 peers(Cohort) ->
     Cohort -- [node()].
+
+persist({next_state, _NextState, NewData, _Actions} = HandleEventResult) ->
+    persist(NewData, HandleEventResult);
+persist({keep_state, NewData, _Actions} = HandleEventResult) ->
+    persist(NewData, HandleEventResult).
+
+persist(Data, HandleEventResult) ->
+    #{store_module := StoreModule} = Data,
+    case StoreModule:save_state(Data) of
+        ok ->
+            HandleEventResult;
+        {error, Reason} ->
+            {stop, Reason}
+    end.
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
deleted file mode 100644
index 987212457..000000000
--- a/src/couch/src/couch_raft_log.erl
+++ /dev/null
@@ -1,52 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-
--module(couch_raft_log).
-
--export([
-    new/0,
-    append/2,
-    sublist/3,
-    nth/2,
-    last/1,
-    index/1,
-    term/1,
-    value/1
-]).
-
-new() ->
-    [].
-
-append(Log, Items) ->
-    lists:append(Log, Items).
-
-sublist(Log, Start, Len) ->
-    lists:sublist(Log, Start, Len).
-
-nth(N, Log) ->
-    lists:nth(N, Log).
-
-last([]) ->
-    {0, 0, undefined};
-last(Log) ->
-    lists:last(Log).
-
-index(Entry) ->
-    element(1, Entry).
-
-term(Entry) ->
-    element(2, Entry).
-
-value(Entry) ->
-    element(3, Entry).
diff --git a/src/couch/src/couch_raft_store.erl b/src/couch/src/couch_raft_store.erl
new file mode 100644
index 000000000..81ebe684e
--- /dev/null
+++ b/src/couch/src/couch_raft_store.erl
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_store).
+
+-callback init(Args :: term()) -> {ok, State :: #{}} | {stop, Reason :: term()}.
+
+% raft state callbacks
+
+-callback save_state(State :: #{}) -> ok | {error, Reason :: term()}.
+
+%% log callbacks
+-type log_entry() :: {Term :: non_neg_integer(), Value :: term()}.
+-callback last(State :: #{}) -> {Index :: non_neg_integer(), Term :: non_neg_integer()}.
+-callback lookup(N :: non_neg_integer(), State :: #{}) -> log_entry() | not_found.
+-callback range(Start :: non_neg_integer(), Len :: non_neg_integer(), State :: #{}) -> [log_entry() | not_found].
+-callback append(Entries :: [log_entry()], State :: #{}) ->
+    {ok, Index :: non_neg_integer(), NewState :: #{}} | {error, Reason :: term()}.
+-callback truncate(To :: non_neg_integer(), State :: #{}) -> {ok, NewState :: #{}} | {error, Reason :: term()}.
+-callback discard(UpTo :: non_neg_integer(), State :: #{}) ->
+    {ok, NewState :: #{}} | {error, Reason :: term()}.
+
+%% state machine callbacks
+-callback apply(Args :: term(), State :: #{}) -> {Result :: term(), NewState :: #{}}.
diff --git a/src/couch/src/couch_raft_store_sha256.erl b/src/couch/src/couch_raft_store_sha256.erl
new file mode 100644
index 000000000..e313da3e2
--- /dev/null
+++ b/src/couch/src/couch_raft_store_sha256.erl
@@ -0,0 +1,80 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% a non-persistent implementation of the raft_log_store behaviour for testing purposes.
+
+-module(couch_raft_store_sha256).
+-behaviour(couch_raft_store).
+
+-export([
+    init/1,
+    save_state/1,
+    %% log
+    last/1,
+    lookup/2,
+    range/3,
+    append/2,
+    truncate/2,
+    discard/2,
+    %% state machine
+    apply/2
+]).
+
+init(Cohort) ->
+    {ok, #{
+        cohort => Cohort,
+        commitIndex => 0,
+        lastApplied => 0,
+        log => [],
+        machine => <<0>>,
+        term => 0,
+        votedFor => undefined
+    }}.
+
+% raft state callbacks
+
+save_state(#{} = State) ->
+    _WouldPersist = maps:with([cohort, term, votedFor, lastApplied, machine], State),
+    ok.
+
+%% log callbacks
+last(#{log := []}) ->
+    {0, 0};
+last(#{log := Log}) ->
+    {LastTerm, _} = lists:last(Log),
+    {length(Log), LastTerm}.
+
+lookup(0, #{}) ->
+    {0, 0};
+lookup(N, #{log := Log}) when N > 0 ->
+    lists:nth(N, Log).
+
+range(Start, Len, #{log := Log}) when Start > 0, Len > 0 ->
+    lists:sublist(Log, Start, Len).
+
+append(Entries, #{log := Log} = State) when is_list(Entries) ->
+    NewLog = lists:append(Log, Entries),
+    {ok, length(NewLog), State#{log => NewLog}}.
+
+truncate(To, #{log := Log} = State) ->
+    {ok, State#{log => lists:sublist(Log, To)}}.
+
+discard(_UpTo, #{}) ->
+    {error, not_implemented}.
+
+%% state machine callbacks
+
+apply(Bin, #{machine := Machine0} = State) when is_binary(Bin), is_binary(Machine0) ->
+    Machine1 = crypto:hash(sha256, <<Machine0/binary, Bin/binary>>),
+    {Machine1, State#{machine => Machine1}}.
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/couch_raft_SUITE.erl
similarity index 57%
rename from src/couch/test/eunit/couch_raft_SUITE.erl
rename to src/couch/test/couch_raft_SUITE.erl
index 1c3f8ebc2..42e1f4ab3 100644
--- a/src/couch/test/eunit/couch_raft_SUITE.erl
+++ b/src/couch/test/couch_raft_SUITE.erl
@@ -27,41 +27,55 @@ all() ->
 
 three_nodes(Config) when is_list(Config) ->
     N = 3,
-    Args = ["-pa", filename:dirname(code:which(craft))],
+    Args = [
+        "-pa", filename:dirname(code:which(couch_raft)),
+        "-pa", filename:dirname(code:which(couch_log)),
+        "-pa", filename:dirname(code:which(couch_stats))
+    ],
     Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
-    Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+    Cohort = [
+        receive
+            {tag, {started, Node, Peer}} -> Node
+        end
+     || {ok, Peer} <- Peers
+    ],
 
-    Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+    {ok, InitialState} = couch_raft_store_sha256:init(Cohort),
+    Crafts = [erpc:call(Node, couch_raft, start, [foo, couch_raft_store_sha256, InitialState]) || Node <- Cohort],
 
     % wait for leader election
     timer:sleep(500),
 
     % verify only one leader elected
-    [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
-    [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+    [{leader, FirstLeader}] = lists:filter(
+        fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]
+    ),
 
     % make a series of calls
     Hash1 = crypto:hash(sha256, <<0, 1>>),
-    ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+    ?assertEqual(Hash1, couch_raft:call(FirstLeader, <<1>>)),
 
     Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
-    ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+    ?assertEqual(Hash2, couch_raft:call(FirstLeader, <<2>>)),
 
     Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
-    ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+    ?assertEqual(Hash3, couch_raft:call(FirstLeader, <<3>>)),
 
     % force a re-election
-    craft3:stop(FirstLeader),
+    couch_raft:stop(FirstLeader),
     timer:sleep(500),
 
     % verify new leader elected
-    [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
-        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+    [{leader, SecondLeader}] = lists:filter(
+        fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]
+    ),
     ?assertNotEqual(FirstLeader, SecondLeader),
 
     % make another call
     Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
-    ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+    ?assertEqual(Hash4, couch_raft:call(SecondLeader, <<4>>)),
 
-    [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+    [couch_raft:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
     [peer:stop(Peer) || {ok, Peer} <- Peers].


[couchdb] 03/09: don't track matchIndex/nextIndex in non-leaders, pointless

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit eeead2a86521372346516e6df2e720fb559903ef
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Jul 4 23:23:36 2022 +0100

    don't track matchIndex/nextIndex in non-leaders, pointless
---
 src/couch/src/couch_raft.erl | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index fda19cc22..c580346fc 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -49,14 +49,10 @@ start_link(Name, StoreModule, StoreState) ->
     gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
 new(Name, StoreModule, StoreState) ->
-    #{cohort := Cohort} = StoreState,
-    Peers = peers(Cohort),
     maps:merge(#{
         name => Name,
         store_module => StoreModule,
         votesGranted => #{},
-        nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
-        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
         froms => #{}
     }, StoreState).
 
@@ -81,7 +77,8 @@ handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
-    persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
+    persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, froms => #{}}),
+        [restart_election_timeout() | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->
     #{term := Term} = Data,
@@ -234,7 +231,7 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State,
     couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
     keep_state_and_data;
 
-handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, leader, #{term := Term} = Data) ->
     #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
     #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
     couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
@@ -251,6 +248,9 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
             }}
     end;
 
+handle_event(cast, #{type := 'AppendEntriesResponse'}, _State, _Data) ->
+    keep_state_and_data;
+
 handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
     #{value := Value} = Msg,
     #{term := Term, store_module := StoreModule, froms := Froms} = Data,


[couchdb] 09/09: HACK: demonstrate the shards running elections in a very hackish way

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1268bb0bb62275baf5533824fd063b9156158090
Author: Robert Newson <rn...@apache.org>
AuthorDate: Thu Sep 1 16:56:10 2022 +0100

    HACK: demonstrate the shards running elections in a very hackish way
---
 src/couch/src/couch_bt_engine.erl | 14 +++++++++++++-
 src/couch/src/couch_bt_engine.hrl |  3 ++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index d93071c1e..b41f4d331 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -963,6 +963,17 @@ init_state(FilePath, Fd, Header0, Options) ->
         {reduce, fun ?MODULE:raft_tree_reduce/2}
     ]),
 
+    %% ugly hack just to see the elections
+    RaftPid = case re:run(FilePath, "shards/[0-9a-f]+-[0-9a-f]+/[^.]+", [{capture, all, list}]) of
+        {match, [ShardName]} ->
+            Cohort = mem3:nodes(), %% hack
+            {ok, InitialRaftState} = couch_raft_store_sha256:init(Cohort),
+            {ok, P} = couch_raft:start(ShardName, couch_raft_store_sha256, InitialRaftState),
+            P;
+        _ ->
+            undefined
+    end,
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -977,7 +988,8 @@ init_state(FilePath, Fd, Header0, Options) ->
         compression = Compression,
         purge_tree = PurgeTree,
         purge_seq_tree = PurgeSeqTree,
-        raft_tree = RaftTree
+        raft_tree = RaftTree,
+        raft_pid = RaftPid
     },
 
     % If this is a new database we've just created a
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index 0d347e99b..8d1dbca2a 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -24,5 +24,6 @@
     compression,
     purge_tree,
     purge_seq_tree,
-    raft_tree
+    raft_tree,
+    raft_pid
 }).


[couchdb] 04/09: clear votesGranted when unused for readability

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9e0e337fc26bef0beded3a4e6ff3fd6b644463c8
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 17 21:53:01 2022 +0100

    clear votesGranted when unused for readability
---
 src/couch/src/couch_raft.erl | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index c580346fc..06025784e 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -52,7 +52,7 @@ new(Name, StoreModule, StoreState) ->
     maps:merge(#{
         name => Name,
         store_module => StoreModule,
-        votesGranted => #{},
+        votesGranted => undefined,
         froms => #{}
     }, StoreState).
 
@@ -71,13 +71,13 @@ callback_mode() ->
 %% erlfmt-ignore
 handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
     couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
-    persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
+    persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined, votesGranted => undefined}, {next_event, cast, Msg}});
 
 handle_event(enter, _OldState, follower, Data) ->
     #{term := Term, froms := Froms} = Data,
     couch_log:notice("~p became follower in term ~B", [node(), Term]),
     Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
-    persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, froms => #{}}),
+    persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
         [restart_election_timeout() | Replies]});
 
 handle_event(enter, _OldState, candidate, Data) ->


[couchdb] 01/09: Integrate raft algorithm (WIP)

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9ce27998e78e2361886c2ac3e028946e3bb3b74f
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat May 14 20:28:05 2022 +0100

    Integrate raft algorithm (WIP)
    
    couch_raft.erl is a complete implementation of the raft algorithm but
    currently only manages an in-memory state machine and log.
    
    Preliminary work is also here to add a new btree inside the `.couch`
    files, which will be the real raft log. The intent is that log entries
    can be removed from this log and applied to by_id and by_seq trees
    atomically.
    
    raft log is not preserved over compaction yet because reading the
    compactor code hurts my eyes.
    
    Anyway, it's progress and hopefully we're going somewhere cool.
---
 src/couch/src/couch_bt_engine.erl         |  95 +++++++-
 src/couch/src/couch_bt_engine.hrl         |   3 +-
 src/couch/src/couch_bt_engine_header.erl  |   7 +-
 src/couch/src/couch_db.erl                |  19 ++
 src/couch/src/couch_db_engine.erl         |  27 +++
 src/couch/src/couch_db_updater.erl        |  10 +
 src/couch/src/couch_raft.erl              | 350 ++++++++++++++++++++++++++++++
 src/couch/src/couch_raft_log.erl          |  52 +++++
 src/couch/test/eunit/couch_raft_SUITE.erl |  67 ++++++
 9 files changed, 621 insertions(+), 9 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 0549de566..8c1a2756d 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
     purge_docs/3,
     copy_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     commit_data/1,
 
     open_write_stream/2,
@@ -102,7 +107,11 @@
     purge_tree_join/2,
     purge_tree_reduce/2,
     purge_seq_tree_split/1,
-    purge_seq_tree_join/2
+    purge_seq_tree_join/2,
+
+    raft_tree_split/1,
+    raft_tree_join/2,
+    raft_tree_reduce/2
 ]).
 
 % Used by the compactor
@@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) ->
     {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
     Changes.
 
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+    Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+    lists:map(
+        fun
+            ({ok, Entry}) -> Entry;
+            (not_found) -> not_found
+        end,
+        Results
+    ).
+
+raft_discard(#st{} = St, UpTo) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+    {FirstIndex, _FirstTerm} = First,
+    Remove = lists:seq(FirstIndex, UpTo),
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+
+raft_last(#st{} = St) ->
+    {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+    Last.
+
 start_compaction(St, DbName, Options, Parent) ->
     Args = [St, DbName, Options, Parent],
     Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) ->
 purge_tree_reduce(rereduce, Reds) ->
     lists:sum(Reds).
 
+raft_tree_split({Index, Term, Value}) ->
+    {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+    {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+    {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+    {MinIndex, MinTerm, _} = lists:min(Entries),
+    {MaxIndex, MaxTerm, _} = lists:max(Entries),
+    {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+    {Mins, Maxs} = lists:unzip(Reds),
+    {lists:min(Mins), lists:max(Maxs)}.
+
 set_update_seq(#st{header = Header} = St, UpdateSeq) ->
     {ok, St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) ->
         {reduce, fun ?MODULE:purge_tree_reduce/2}
     ]),
 
+    RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+    {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+        {split, fun ?MODULE:raft_tree_split/1},
+        {join, fun ?MODULE:raft_tree_join/2},
+        {reduce, fun ?MODULE:raft_tree_reduce/2}
+    ]),
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) ->
         local_tree = LocalTree,
         compression = Compression,
         purge_tree = PurgeTree,
-        purge_seq_tree = PurgeSeqTree
+        purge_seq_tree = PurgeSeqTree,
+        raft_tree = RaftTree
     },
 
     % If this is a new database we've just created a
@@ -927,7 +999,8 @@ update_header(St, Header) ->
         {id_tree_state, couch_btree:get_state(St#st.id_tree)},
         {local_tree_state, couch_btree:get_state(St#st.local_tree)},
         {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
-        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+        {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
     ]).
 
 increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) ->
         St#st.seq_tree,
         St#st.local_tree,
         St#st.purge_tree,
-        St#st.purge_seq_tree
+        St#st.purge_seq_tree,
+        St#st.raft_tree
     ],
     lists:foldl(
         fun(T, Acc) ->
@@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) ->
 finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     #st{
         filepath = FilePath,
-        local_tree = OldLocal
+        local_tree = OldLocal,
+        raft_tree = OldRaft
     } = OldSt,
     #st{
         filepath = CompactDataPath,
         header = Header,
-        local_tree = NewLocal1
+        local_tree = NewLocal1,
+        raft_tree = NewRaft1
     } = NewSt1,
 
     % suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
     {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
 
+    % do the same for the raft log
+    {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+    {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
     {ok, NewSt2} = commit_data(NewSt1#st{
         header = couch_bt_engine_header:set(Header, [
             {compacted_seq, get_update_seq(OldSt)},
             {revs_limit, get_revs_limit(OldSt)},
             {purge_infos_limit, get_purge_infos_limit(OldSt)}
         ]),
-        local_tree = NewLocal2
+        local_tree = NewLocal2,
+        raft_tree = NewRaft2
     }),
 
     % Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d4983..0d347e99b 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
     local_tree,
     compression,
     purge_tree,
-    purge_seq_tree
+    purge_seq_tree,
+    raft_tree
 }).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f07723..9e663b096 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
     purge_tree_state/1,
     purge_seq_tree_state/1,
     purge_infos_limit/1,
+    raft_tree_state/1,
     security_ptr/1,
     revs_limit/1,
     uuid/1,
@@ -69,7 +70,8 @@
     epochs,
     compacted_seq,
     purge_infos_limit = 1000,
-    props_ptr
+    props_ptr,
+    raft_tree_state = nil
 }).
 
 -define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@ compacted_seq(Header) ->
 purge_infos_limit(Header) ->
     get_field(Header, purge_infos_limit).
 
+raft_tree_state(Header) ->
+    get_field(Header, raft_tree_state).
+
 get_field(Header, Field) ->
     get_field(Header, Field, undefined).
 
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index dd7e07517..e197f98a4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
     fold_purge_infos/4,
     fold_purge_infos/5,
 
+    raft_insert/2,
+    raft_lookup/2,
+    raft_discard/2,
+    raft_last/1,
+
     calculate_start_seq/3,
     owner_of/2,
 
@@ -1822,6 +1827,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
 fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
     couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
 
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+    couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+    couch_db_engine:raft_last(Db).
+
 count_changes_since(Db, SinceSeq) ->
     couch_db_engine:count_changes_since(Db, SinceSeq).
 
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 052a527e3..63b9d49a3 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
     read_doc_body/2,
     load_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     serialize_doc/2,
     write_doc_body/2,
     write_doc_infos/3,
@@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) ->
     ),
     {ok, Db#db{engine = {Engine, NewSt}}}.
 
+raft_insert(#db{} = Db, Entries) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_insert(
+        EngineState, Entries
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_discard(
+        EngineState, UpTo
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_last(EngineState).
+
 commit_data(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 0248c21ec..7c1f97804 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
         end,
     {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
     {reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
         {reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 000000000..f398b4f2a
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+    start/2,
+    start_link/2,
+    stop/1,
+    call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+    init/1,
+    callback_mode/0,
+    handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+    gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+    gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+    Peers = peers(Cohort),
+    #{
+        name => Name,
+        cohort => Cohort,
+        term => 0,
+        votedFor => undefined,
+        votesGranted => #{},
+        nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+        log => couch_raft_log:new(),
+        commitIndex => 0,
+        froms => #{},
+        lastApplied => 0,
+        machine => <<0>>
+    }.
+
+stop(ServerRef) ->
+    gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+    gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+    {ok, follower, Data}.
+
+callback_mode() ->
+    [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+    couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+    {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+    #{term := Term, froms := Froms} = Data,
+    couch_log:notice("~p became follower in term ~B", [node(), Term]),
+    Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+    {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+    #{term := Term} = Data,
+    couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+    {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+    #{log := Log, cohort := Cohort, term := Term} = Data,
+    couch_log:notice("~p became leader in term ~B", [node(), Term]),
+    Peers = peers(Cohort),
+    {keep_state, Data#{
+        nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+    }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        lastLogIndex := MLastLogIndex,
+        lastLogTerm := MLastLogTerm
+    } = Msg,
+    #{
+        log := Log,
+        votedFor := VotedFor
+    } = Data,
+    LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+    Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+    couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+    Reply = #{
+        type => 'RequestVoteResponse',
+        term => CurrentTerm,
+        voteGranted => Grant,
+        source => node()
+    },
+    cast(MSource, Reply, Data),
+    if
+        Grant ->
+            {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+        true ->
+            {keep_state_and_data, restart_election_timeout()}
+    end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{source := MSource, voteGranted := MVoteGranted} = Msg,
+    #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+    VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+    couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+    if
+        length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+            couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+            {next_state, leader, Data#{votesGranted => VotesGranted1}};
+        true ->
+            {keep_state, Data#{votesGranted => VotesGranted1}}
+    end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        prevLogIndex := MPrevLogIndex,
+        prevLogTerm := MPrevLogTerm,
+        entries := MEntries,
+        commitIndex := MCommitIndex
+    } = Msg,
+    #{
+        log := Log
+    } = Data,
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+    if
+        Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+            Reply = #{
+                type => 'AppendEntriesResponse',
+                term => CurrentTerm,
+                success => false,
+                matchIndex => 0,
+                source => node()
+            },
+            cast(MSource, Reply, Data),
+            if
+                State == leader ->
+                    keep_state_and_data;
+                true ->
+                    {keep_state_and_data, restart_election_timeout()}
+            end;
+        Term == CurrentTerm andalso State == candidate ->
+            {next_state, follower, Data, {next_event, cast, Msg}};
+        Term == CurrentTerm andalso State == follower andalso LogOk ->
+            if
+                MEntries == [] ->
+                    Reply = #{
+                        type => 'AppendEntriesResponse',
+                        term => CurrentTerm,
+                        success => true,
+                        matchIndex => MPrevLogIndex,
+                        source => node()
+                    },
+                    couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+                    cast(MSource, Reply, Data),
+                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                true ->
+                    Index = MPrevLogIndex + 1,
+                    LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+                    if
+                        LastLogIndex >= Index ->
+                            NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+                            FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+                            if
+                                NthLogTerm == FirstEntryTerm ->
+                                    Reply = #{
+                                        type => 'AppendEntriesResponse',
+                                        term => CurrentTerm,
+                                        success => true,
+                                        matchIndex => MPrevLogIndex + length(MEntries),
+                                        source => node()
+                                    },
+                                    couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+                                    cast(MSource, Reply, Data),
+                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                                NthLogTerm /= FirstEntryTerm ->
+                                    couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+                                    {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                            end;
+                        LastLogIndex == MPrevLogIndex ->
+                            couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+                            {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                    end
+            end
+    end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+    #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+    couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+    SourceNextIndex = maps:get(MSource, NextIndex),
+    if
+        MSuccess ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+                matchIndex => MatchIndex#{MSource => MMatchIndex}
+            }};
+        true ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+            }}
+    end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+    #{value := Value} = Msg,
+    #{term := Term, log := Log, froms := Froms} = Data,
+    EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+    Entry = {EntryIndex, Term, Value},
+    {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+    {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+    #{term := Term} = Data,
+    couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+    {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+    #{term := Term} = Data,
+    couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+    ok = send_append_entries(Data),
+    {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+    {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+    send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+    ok;
+send_append_entries([Peer | Rest], Data) ->
+    #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+    PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+    PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+    LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+    Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+    Msg = #{
+        type => 'AppendEntriesRequest',
+        term => Term,
+        source => node(),
+        prevLogIndex => PrevLogIndex,
+        prevLogTerm => PrevLogTerm,
+        entries => Entries,
+        commitIndex => min(CommitIndex, LastEntry)
+    },
+    cast(Peer, Msg, Data),
+    send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+    #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+    LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+    LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+    NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+    if
+        LastTerm == Term ->
+            update_state_machine(Data#{commitIndex => NewCommitIndex});
+        true ->
+            Data
+    end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+    Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+    #{log := Log, froms := Froms0, machine := Machine0} = Data,
+    From = LastApplied + 1,
+    To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+    Fun = fun(Index, {Froms, Machine}) ->
+        Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+        Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+        case maps:is_key(Index, Froms) of
+            true ->
+                gen_statem:reply(maps:get(Index, Froms), Result),
+                {maps:remove(Index, Froms), Result};
+            false ->
+                {Froms, Result}
+        end
+    end,
+    {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+    Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+    #{term := Term, cohort := Cohort, log := Log} = Data,
+    ElectionTerm = Term + 1,
+    couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    RequestVote = #{
+        type => 'RequestVoteRequest',
+        term => ElectionTerm,
+        lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+        lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+        source => node()
+    },
+    lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+    Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+    gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+    {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+    {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+    Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 000000000..987212457
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+    new/0,
+    append/2,
+    sublist/3,
+    nth/2,
+    last/1,
+    index/1,
+    term/1,
+    value/1
+]).
+
+new() ->
+    [].
+
+append(Log, Items) ->
+    lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+    lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+    lists:nth(N, Log).
+
+last([]) ->
+    {0, 0, undefined};
+last(Log) ->
+    lists:last(Log).
+
+index(Entry) ->
+    element(1, Entry).
+
+term(Entry) ->
+    element(2, Entry).
+
+value(Entry) ->
+    element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 000000000..1c3f8ebc2
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+    N = 3,
+    Args = ["-pa", filename:dirname(code:which(craft))],
+    Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+    Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+    Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+    % wait for leader election
+    timer:sleep(500),
+
+    % verify only one leader elected
+    [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+    [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+    % make a series of calls
+    Hash1 = crypto:hash(sha256, <<0, 1>>),
+    ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+    Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+    ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+    Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+    ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+    % force a re-election
+    craft3:stop(FirstLeader),
+    timer:sleep(500),
+
+    % verify new leader elected
+    [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+    ?assertNotEqual(FirstLeader, SecondLeader),
+
+    % make another call
+    Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+    ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+    [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+    [peer:stop(Peer) || {ok, Peer} <- Peers].


[couchdb] 07/09: use global so raft member names don't have to be atoms (we'll run out)

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 76c0f0dae262b94558df1895b48b9fc7e82aa4ec
Author: Robert Newson <rn...@apache.org>
AuthorDate: Thu Sep 1 15:20:07 2022 +0100

    use global so raft member names don't have to be atoms (we'll run out)
---
 src/couch/src/couch_raft.erl | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index b862d055b..f20207a21 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -40,10 +40,10 @@
 %% public api
 
 start(Name, StoreModule, StoreState) ->
-    gen_statem:start({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
+    gen_statem:start({global, [Name, node()]}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
 start_link(Name, StoreModule, StoreState) ->
-    gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
+    gen_statem:start_link({global, [Name, node()]}, ?MODULE, new(Name, StoreModule, StoreState), []).
 
 new(Name, StoreModule, StoreState) ->
     maps:merge(#{
@@ -354,8 +354,7 @@ start_election(Data) ->
     Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
 
 cast(Node, Msg, #{name := Name}) ->
-    gen_statem:cast({Name, Node}, Msg).
-
+    gen_statem:cast({global, [Name, Node]}, Msg).
 
 state_timeout(follower) ->
     {state_timeout, 150 + rand:uniform(150), new_election};