You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2022/09/01 16:05:56 UTC

[couchdb] 01/09: Integrate raft algorithm (WIP)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9ce27998e78e2361886c2ac3e028946e3bb3b74f
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat May 14 20:28:05 2022 +0100

    Integrate raft algorithm (WIP)
    
    couch_raft.erl is a complete implementation of the raft algorithm but
    currently only manages an in-memory state machine and log.
    
    Preliminary work is also here to add a new btree inside the `.couch`
    files, which will be the real raft log. The intent is that log entries
    can be removed from this log and applied to by_id and by_seq trees
    atomically.
    
    raft log is not preserved over compaction yet because reading the
    compactor code hurts my eyes.
    
    Anyway, it's progress and hopefully we're going somewhere cool.
---
 src/couch/src/couch_bt_engine.erl         |  95 +++++++-
 src/couch/src/couch_bt_engine.hrl         |   3 +-
 src/couch/src/couch_bt_engine_header.erl  |   7 +-
 src/couch/src/couch_db.erl                |  19 ++
 src/couch/src/couch_db_engine.erl         |  27 +++
 src/couch/src/couch_db_updater.erl        |  10 +
 src/couch/src/couch_raft.erl              | 350 ++++++++++++++++++++++++++++++
 src/couch/src/couch_raft_log.erl          |  52 +++++
 src/couch/test/eunit/couch_raft_SUITE.erl |  67 ++++++
 9 files changed, 621 insertions(+), 9 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 0549de566..8c1a2756d 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
     purge_docs/3,
     copy_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     commit_data/1,
 
     open_write_stream/2,
@@ -102,7 +107,11 @@
     purge_tree_join/2,
     purge_tree_reduce/2,
     purge_seq_tree_split/1,
-    purge_seq_tree_join/2
+    purge_seq_tree_join/2,
+
+    raft_tree_split/1,
+    raft_tree_join/2,
+    raft_tree_reduce/2
 ]).
 
 % Used by the compactor
@@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) ->
     {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
     Changes.
 
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+    Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+    lists:map(
+        fun
+            ({ok, Entry}) -> Entry;
+            (not_found) -> not_found
+        end,
+        Results
+    ).
+
+raft_discard(#st{} = St, UpTo) ->
+    #st{
+        raft_tree = RaftTree0
+    } = St,
+    {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+    {FirstIndex, _FirstTerm} = First,
+    Remove = lists:seq(FirstIndex, UpTo),
+    {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+    {ok, St#st{
+        raft_tree = RaftTree1,
+        needs_commit = true
+    }}.
+
+
+raft_last(#st{} = St) ->
+    {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+    Last.
+
 start_compaction(St, DbName, Options, Parent) ->
     Args = [St, DbName, Options, Parent],
     Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) ->
 purge_tree_reduce(rereduce, Reds) ->
     lists:sum(Reds).
 
+raft_tree_split({Index, Term, Value}) ->
+    {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+    {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+    {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+    {MinIndex, MinTerm, _} = lists:min(Entries),
+    {MaxIndex, MaxTerm, _} = lists:max(Entries),
+    {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+    {Mins, Maxs} = lists:unzip(Reds),
+    {lists:min(Mins), lists:max(Maxs)}.
+
 set_update_seq(#st{header = Header} = St, UpdateSeq) ->
     {ok, St#st{
         header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) ->
         {reduce, fun ?MODULE:purge_tree_reduce/2}
     ]),
 
+    RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+    {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+        {split, fun ?MODULE:raft_tree_split/1},
+        {join, fun ?MODULE:raft_tree_join/2},
+        {reduce, fun ?MODULE:raft_tree_reduce/2}
+    ]),
+
     ok = couch_file:set_db_pid(Fd, self()),
 
     St = #st{
@@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) ->
         local_tree = LocalTree,
         compression = Compression,
         purge_tree = PurgeTree,
-        purge_seq_tree = PurgeSeqTree
+        purge_seq_tree = PurgeSeqTree,
+        raft_tree = RaftTree
     },
 
     % If this is a new database we've just created a
@@ -927,7 +999,8 @@ update_header(St, Header) ->
         {id_tree_state, couch_btree:get_state(St#st.id_tree)},
         {local_tree_state, couch_btree:get_state(St#st.local_tree)},
         {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
-        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+        {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+        {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
     ]).
 
 increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) ->
         St#st.seq_tree,
         St#st.local_tree,
         St#st.purge_tree,
-        St#st.purge_seq_tree
+        St#st.purge_seq_tree,
+        St#st.raft_tree
     ],
     lists:foldl(
         fun(T, Acc) ->
@@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) ->
 finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     #st{
         filepath = FilePath,
-        local_tree = OldLocal
+        local_tree = OldLocal,
+        raft_tree = OldRaft
     } = OldSt,
     #st{
         filepath = CompactDataPath,
         header = Header,
-        local_tree = NewLocal1
+        local_tree = NewLocal1,
+        raft_tree = NewRaft1
     } = NewSt1,
 
     % suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
     {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
     {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
 
+    % do the same for the raft log
+    {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+    {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
     {ok, NewSt2} = commit_data(NewSt1#st{
         header = couch_bt_engine_header:set(Header, [
             {compacted_seq, get_update_seq(OldSt)},
             {revs_limit, get_revs_limit(OldSt)},
             {purge_infos_limit, get_purge_infos_limit(OldSt)}
         ]),
-        local_tree = NewLocal2
+        local_tree = NewLocal2,
+        raft_tree = NewRaft2
     }),
 
     % Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d4983..0d347e99b 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
     local_tree,
     compression,
     purge_tree,
-    purge_seq_tree
+    purge_seq_tree,
+    raft_tree
 }).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f07723..9e663b096 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
     purge_tree_state/1,
     purge_seq_tree_state/1,
     purge_infos_limit/1,
+    raft_tree_state/1,
     security_ptr/1,
     revs_limit/1,
     uuid/1,
@@ -69,7 +70,8 @@
     epochs,
     compacted_seq,
     purge_infos_limit = 1000,
-    props_ptr
+    props_ptr,
+    raft_tree_state = nil
 }).
 
 -define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@ compacted_seq(Header) ->
 purge_infos_limit(Header) ->
     get_field(Header, purge_infos_limit).
 
+raft_tree_state(Header) ->
+    get_field(Header, raft_tree_state).
+
 get_field(Header, Field) ->
     get_field(Header, Field, undefined).
 
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index dd7e07517..e197f98a4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
     fold_purge_infos/4,
     fold_purge_infos/5,
 
+    raft_insert/2,
+    raft_lookup/2,
+    raft_discard/2,
+    raft_last/1,
+
     calculate_start_seq/3,
     owner_of/2,
 
@@ -1822,6 +1827,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
 fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
     couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
 
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+    couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+    check_is_admin(Db),
+    gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+    couch_db_engine:raft_last(Db).
+
 count_changes_since(Db, SinceSeq) ->
     couch_db_engine:count_changes_since(Db, SinceSeq).
 
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 052a527e3..63b9d49a3 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
     read_doc_body/2,
     load_purge_infos/2,
 
+    raft_lookup/2,
+    raft_insert/2,
+    raft_discard/2,
+    raft_last/1,
+
     serialize_doc/2,
     write_doc_body/2,
     write_doc_infos/3,
@@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) ->
     ),
     {ok, Db#db{engine = {Engine, NewSt}}}.
 
+raft_insert(#db{} = Db, Entries) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_insert(
+        EngineState, Entries
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    {ok, NewSt} = Engine:raft_discard(
+        EngineState, UpTo
+    ),
+    {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:raft_last(EngineState).
+
 commit_data(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 0248c21ec..7c1f97804 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
         end,
     {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
     {reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+    Db3 = commit_data(Db2),
+    ok = couch_server:db_updated(Db3),
+    {reply, ok, Db3, idle_limit()};
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
         {reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 000000000..f398b4f2a
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+    start/2,
+    start_link/2,
+    stop/1,
+    call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+    init/1,
+    callback_mode/0,
+    handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+    gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+    gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+    Peers = peers(Cohort),
+    #{
+        name => Name,
+        cohort => Cohort,
+        term => 0,
+        votedFor => undefined,
+        votesGranted => #{},
+        nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+        log => couch_raft_log:new(),
+        commitIndex => 0,
+        froms => #{},
+        lastApplied => 0,
+        machine => <<0>>
+    }.
+
+stop(ServerRef) ->
+    gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+    gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+    {ok, follower, Data}.
+
+callback_mode() ->
+    [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+    couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+    {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+    #{term := Term, froms := Froms} = Data,
+    couch_log:notice("~p became follower in term ~B", [node(), Term]),
+    Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+    {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+    #{term := Term} = Data,
+    couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+    {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+    #{log := Log, cohort := Cohort, term := Term} = Data,
+    couch_log:notice("~p became leader in term ~B", [node(), Term]),
+    Peers = peers(Cohort),
+    {keep_state, Data#{
+        nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+        matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+    }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        lastLogIndex := MLastLogIndex,
+        lastLogTerm := MLastLogTerm
+    } = Msg,
+    #{
+        log := Log,
+        votedFor := VotedFor
+    } = Data,
+    LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+    Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+    couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+    Reply = #{
+        type => 'RequestVoteResponse',
+        term => CurrentTerm,
+        voteGranted => Grant,
+        source => node()
+    },
+    cast(MSource, Reply, Data),
+    if
+        Grant ->
+            {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+        true ->
+            {keep_state_and_data, restart_election_timeout()}
+    end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{source := MSource, voteGranted := MVoteGranted} = Msg,
+    #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+    VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+    couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+    if
+        length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+            couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+            {next_state, leader, Data#{votesGranted => VotesGranted1}};
+        true ->
+            {keep_state, Data#{votesGranted => VotesGranted1}}
+    end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+  when Term =< CurrentTerm ->
+    #{
+        source := MSource,
+        prevLogIndex := MPrevLogIndex,
+        prevLogTerm := MPrevLogTerm,
+        entries := MEntries,
+        commitIndex := MCommitIndex
+    } = Msg,
+    #{
+        log := Log
+    } = Data,
+    LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+    if
+        Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+            Reply = #{
+                type => 'AppendEntriesResponse',
+                term => CurrentTerm,
+                success => false,
+                matchIndex => 0,
+                source => node()
+            },
+            cast(MSource, Reply, Data),
+            if
+                State == leader ->
+                    keep_state_and_data;
+                true ->
+                    {keep_state_and_data, restart_election_timeout()}
+            end;
+        Term == CurrentTerm andalso State == candidate ->
+            {next_state, follower, Data, {next_event, cast, Msg}};
+        Term == CurrentTerm andalso State == follower andalso LogOk ->
+            if
+                MEntries == [] ->
+                    Reply = #{
+                        type => 'AppendEntriesResponse',
+                        term => CurrentTerm,
+                        success => true,
+                        matchIndex => MPrevLogIndex,
+                        source => node()
+                    },
+                    couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+                    cast(MSource, Reply, Data),
+                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                true ->
+                    Index = MPrevLogIndex + 1,
+                    LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+                    if
+                        LastLogIndex >= Index ->
+                            NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+                            FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+                            if
+                                NthLogTerm == FirstEntryTerm ->
+                                    Reply = #{
+                                        type => 'AppendEntriesResponse',
+                                        term => CurrentTerm,
+                                        success => true,
+                                        matchIndex => MPrevLogIndex + length(MEntries),
+                                        source => node()
+                                    },
+                                    couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+                                    cast(MSource, Reply, Data),
+                                    {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+                                NthLogTerm /= FirstEntryTerm ->
+                                    couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+                                    {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                            end;
+                        LastLogIndex == MPrevLogIndex ->
+                            couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+                            {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+                    end
+            end
+    end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+    couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+    keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+    #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+    #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+    couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+    SourceNextIndex = maps:get(MSource, NextIndex),
+    if
+        MSuccess ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+                matchIndex => MatchIndex#{MSource => MMatchIndex}
+            }};
+        true ->
+            {keep_state, Data#{
+                nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+            }}
+    end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+    #{value := Value} = Msg,
+    #{term := Term, log := Log, froms := Froms} = Data,
+    EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+    Entry = {EntryIndex, Term, Value},
+    {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+    {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+    #{term := Term} = Data,
+    couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+    {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+    #{term := Term} = Data,
+    couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+    ok = send_append_entries(Data),
+    {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+    {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+    send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+    ok;
+send_append_entries([Peer | Rest], Data) ->
+    #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+    PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+    PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+    LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+    Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+    Msg = #{
+        type => 'AppendEntriesRequest',
+        term => Term,
+        source => node(),
+        prevLogIndex => PrevLogIndex,
+        prevLogTerm => PrevLogTerm,
+        entries => Entries,
+        commitIndex => min(CommitIndex, LastEntry)
+    },
+    cast(Peer, Msg, Data),
+    send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+    #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+    LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+    LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+    NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+    if
+        LastTerm == Term ->
+            update_state_machine(Data#{commitIndex => NewCommitIndex});
+        true ->
+            Data
+    end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+    Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+    #{log := Log, froms := Froms0, machine := Machine0} = Data,
+    From = LastApplied + 1,
+    To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+    Fun = fun(Index, {Froms, Machine}) ->
+        Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+        Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+        case maps:is_key(Index, Froms) of
+            true ->
+                gen_statem:reply(maps:get(Index, Froms), Result),
+                {maps:remove(Index, Froms), Result};
+            false ->
+                {Froms, Result}
+        end
+    end,
+    {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+    Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+    #{term := Term, cohort := Cohort, log := Log} = Data,
+    ElectionTerm = Term + 1,
+    couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+    RequestVote = #{
+        type => 'RequestVoteRequest',
+        term => ElectionTerm,
+        lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+        lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+        source => node()
+    },
+    lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+    Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+    gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+    {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+    {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+    Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 000000000..987212457
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+    new/0,
+    append/2,
+    sublist/3,
+    nth/2,
+    last/1,
+    index/1,
+    term/1,
+    value/1
+]).
+
+new() ->
+    [].
+
+append(Log, Items) ->
+    lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+    lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+    lists:nth(N, Log).
+
+last([]) ->
+    {0, 0, undefined};
+last(Log) ->
+    lists:last(Log).
+
+index(Entry) ->
+    element(1, Entry).
+
+term(Entry) ->
+    element(2, Entry).
+
+value(Entry) ->
+    element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 000000000..1c3f8ebc2
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+    N = 3,
+    Args = ["-pa", filename:dirname(code:which(craft))],
+    Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+    Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+    Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+    % wait for leader election
+    timer:sleep(500),
+
+    % verify only one leader elected
+    [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+    [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+    % make a series of calls
+    Hash1 = crypto:hash(sha256, <<0, 1>>),
+    ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+    Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+    ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+    Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+    ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+    % force a re-election
+    craft3:stop(FirstLeader),
+    timer:sleep(500),
+
+    % verify new leader elected
+    [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+        [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+    ?assertNotEqual(FirstLeader, SecondLeader),
+
+    % make another call
+    Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+    ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+    [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+    [peer:stop(Peer) || {ok, Peer} <- Peers].