You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2022/09/01 16:05:56 UTC
[couchdb] 01/09: Integrate raft algorithm (WIP)
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 9ce27998e78e2361886c2ac3e028946e3bb3b74f
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat May 14 20:28:05 2022 +0100
Integrate raft algorithm (WIP)
couch_raft.erl is a complete implementation of the raft algorithm but
currently only manages an in-memory state machine and log.
Preliminary work is also here to add a new btree inside the `.couch`
files, which will be the real raft log. The intent is that log entries
can be removed from this log and applied to by_id and by_seq trees
atomically.
raft log is not preserved over compaction yet because reading the
compactor code hurts my eyes.
Anyway, it's progress and hopefully we're going somewhere cool.
---
src/couch/src/couch_bt_engine.erl | 95 +++++++-
src/couch/src/couch_bt_engine.hrl | 3 +-
src/couch/src/couch_bt_engine_header.erl | 7 +-
src/couch/src/couch_db.erl | 19 ++
src/couch/src/couch_db_engine.erl | 27 +++
src/couch/src/couch_db_updater.erl | 10 +
src/couch/src/couch_raft.erl | 350 ++++++++++++++++++++++++++++++
src/couch/src/couch_raft_log.erl | 52 +++++
src/couch/test/eunit/couch_raft_SUITE.erl | 67 ++++++
9 files changed, 621 insertions(+), 9 deletions(-)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 0549de566..8c1a2756d 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
purge_docs/3,
copy_purge_infos/2,
+ raft_lookup/2,
+ raft_insert/2,
+ raft_discard/2,
+ raft_last/1,
+
commit_data/1,
open_write_stream/2,
@@ -102,7 +107,11 @@
purge_tree_join/2,
purge_tree_reduce/2,
purge_seq_tree_split/1,
- purge_seq_tree_join/2
+ purge_seq_tree_join/2,
+
+ raft_tree_split/1,
+ raft_tree_join/2,
+ raft_tree_reduce/2
]).
% Used by the compactor
@@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) ->
{ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
Changes.
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+ #st{
+ raft_tree = RaftTree0
+ } = St,
+ {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+ {ok, St#st{
+ raft_tree = RaftTree1,
+ needs_commit = true
+ }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+ Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+ lists:map(
+ fun
+ ({ok, Entry}) -> Entry;
+ (not_found) -> not_found
+ end,
+ Results
+ ).
+
+raft_discard(#st{} = St, UpTo) ->
+ #st{
+ raft_tree = RaftTree0
+ } = St,
+ {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+ {FirstIndex, _FirstTerm} = First,
+ Remove = lists:seq(FirstIndex, UpTo),
+ {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+ {ok, St#st{
+ raft_tree = RaftTree1,
+ needs_commit = true
+ }}.
+
+
+raft_last(#st{} = St) ->
+ {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+ Last.
+
start_compaction(St, DbName, Options, Parent) ->
Args = [St, DbName, Options, Parent],
Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) ->
purge_tree_reduce(rereduce, Reds) ->
lists:sum(Reds).
+raft_tree_split({Index, Term, Value}) ->
+ {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+ {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+ {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+ {MinIndex, MinTerm, _} = lists:min(Entries),
+ {MaxIndex, MaxTerm, _} = lists:max(Entries),
+ {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+ {Mins, Maxs} = lists:unzip(Reds),
+ {lists:min(Mins), lists:max(Maxs)}.
+
set_update_seq(#st{header = Header} = St, UpdateSeq) ->
{ok, St#st{
header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) ->
{reduce, fun ?MODULE:purge_tree_reduce/2}
]),
+ RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+ {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+ {split, fun ?MODULE:raft_tree_split/1},
+ {join, fun ?MODULE:raft_tree_join/2},
+ {reduce, fun ?MODULE:raft_tree_reduce/2}
+ ]),
+
ok = couch_file:set_db_pid(Fd, self()),
St = #st{
@@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) ->
local_tree = LocalTree,
compression = Compression,
purge_tree = PurgeTree,
- purge_seq_tree = PurgeSeqTree
+ purge_seq_tree = PurgeSeqTree,
+ raft_tree = RaftTree
},
% If this is a new database we've just created a
@@ -927,7 +999,8 @@ update_header(St, Header) ->
{id_tree_state, couch_btree:get_state(St#st.id_tree)},
{local_tree_state, couch_btree:get_state(St#st.local_tree)},
{purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
- {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+ {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+ {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
]).
increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) ->
St#st.seq_tree,
St#st.local_tree,
St#st.purge_tree,
- St#st.purge_seq_tree
+ St#st.purge_seq_tree,
+ St#st.raft_tree
],
lists:foldl(
fun(T, Acc) ->
@@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) ->
finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
#st{
filepath = FilePath,
- local_tree = OldLocal
+ local_tree = OldLocal,
+ raft_tree = OldRaft
} = OldSt,
#st{
filepath = CompactDataPath,
header = Header,
- local_tree = NewLocal1
+ local_tree = NewLocal1,
+ raft_tree = NewRaft1
} = NewSt1,
% suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
{ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
{ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
+ % do the same for the raft log
+ {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+ {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
{ok, NewSt2} = commit_data(NewSt1#st{
header = couch_bt_engine_header:set(Header, [
{compacted_seq, get_update_seq(OldSt)},
{revs_limit, get_revs_limit(OldSt)},
{purge_infos_limit, get_purge_infos_limit(OldSt)}
]),
- local_tree = NewLocal2
+ local_tree = NewLocal2,
+ raft_tree = NewRaft2
}),
% Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d4983..0d347e99b 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
local_tree,
compression,
purge_tree,
- purge_seq_tree
+ purge_seq_tree,
+ raft_tree
}).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f07723..9e663b096 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
purge_tree_state/1,
purge_seq_tree_state/1,
purge_infos_limit/1,
+ raft_tree_state/1,
security_ptr/1,
revs_limit/1,
uuid/1,
@@ -69,7 +70,8 @@
epochs,
compacted_seq,
purge_infos_limit = 1000,
- props_ptr
+ props_ptr,
+ raft_tree_state = nil
}).
-define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@ compacted_seq(Header) ->
purge_infos_limit(Header) ->
get_field(Header, purge_infos_limit).
+raft_tree_state(Header) ->
+ get_field(Header, raft_tree_state).
+
get_field(Header, Field) ->
get_field(Header, Field, undefined).
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index dd7e07517..e197f98a4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
fold_purge_infos/4,
fold_purge_infos/5,
+ raft_insert/2,
+ raft_lookup/2,
+ raft_discard/2,
+ raft_last/1,
+
calculate_start_seq/3,
owner_of/2,
@@ -1822,6 +1827,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+ couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+ couch_db_engine:raft_last(Db).
+
count_changes_since(Db, SinceSeq) ->
couch_db_engine:count_changes_since(Db, SinceSeq).
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 052a527e3..63b9d49a3 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
read_doc_body/2,
load_purge_infos/2,
+ raft_lookup/2,
+ raft_insert/2,
+ raft_discard/2,
+ raft_last/1,
+
serialize_doc/2,
write_doc_body/2,
write_doc_infos/3,
@@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) ->
),
{ok, Db#db{engine = {Engine, NewSt}}}.
+raft_insert(#db{} = Db, Entries) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:raft_insert(
+ EngineState, Entries
+ ),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:raft_discard(
+ EngineState, UpTo
+ ),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:raft_last(EngineState).
+
commit_data(#db{} = Db) ->
#db{engine = {Engine, EngineState}} = Db,
{ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 0248c21ec..7c1f97804 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
end,
{ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
{reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+ Db3 = commit_data(Db2),
+ ok = couch_server:db_updated(Db3),
+ {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+ Db3 = commit_data(Db2),
+ ok = couch_server:db_updated(Db3),
+ {reply, ok, Db3, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
{reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 000000000..f398b4f2a
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+ start/2,
+ start_link/2,
+ stop/1,
+ call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+ init/1,
+ callback_mode/0,
+ handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+ gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+ gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+ Peers = peers(Cohort),
+ #{
+ name => Name,
+ cohort => Cohort,
+ term => 0,
+ votedFor => undefined,
+ votesGranted => #{},
+ nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+ matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+ log => couch_raft_log:new(),
+ commitIndex => 0,
+ froms => #{},
+ lastApplied => 0,
+ machine => <<0>>
+ }.
+
+stop(ServerRef) ->
+ gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+ gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+ {ok, follower, Data}.
+
+callback_mode() ->
+ [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+ couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+ {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+ #{term := Term, froms := Froms} = Data,
+ couch_log:notice("~p became follower in term ~B", [node(), Term]),
+ Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+ {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+ #{term := Term} = Data,
+ couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+ {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+ #{log := Log, cohort := Cohort, term := Term} = Data,
+ couch_log:notice("~p became leader in term ~B", [node(), Term]),
+ Peers = peers(Cohort),
+ {keep_state, Data#{
+ nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+ matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+ }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+ when Term =< CurrentTerm ->
+ #{
+ source := MSource,
+ lastLogIndex := MLastLogIndex,
+ lastLogTerm := MLastLogTerm
+ } = Msg,
+ #{
+ log := Log,
+ votedFor := VotedFor
+ } = Data,
+ LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+ Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+ couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+ Reply = #{
+ type => 'RequestVoteResponse',
+ term => CurrentTerm,
+ voteGranted => Grant,
+ source => node()
+ },
+ cast(MSource, Reply, Data),
+ if
+ Grant ->
+ {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+ true ->
+ {keep_state_and_data, restart_election_timeout()}
+ end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+ couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+ keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+ #{source := MSource, voteGranted := MVoteGranted} = Msg,
+ #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+ VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+ couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+ if
+ length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+ couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+ {next_state, leader, Data#{votesGranted => VotesGranted1}};
+ true ->
+ {keep_state, Data#{votesGranted => VotesGranted1}}
+ end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+ when Term =< CurrentTerm ->
+ #{
+ source := MSource,
+ prevLogIndex := MPrevLogIndex,
+ prevLogTerm := MPrevLogTerm,
+ entries := MEntries,
+ commitIndex := MCommitIndex
+ } = Msg,
+ #{
+ log := Log
+ } = Data,
+ LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+ if
+ Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => false,
+ matchIndex => 0,
+ source => node()
+ },
+ cast(MSource, Reply, Data),
+ if
+ State == leader ->
+ keep_state_and_data;
+ true ->
+ {keep_state_and_data, restart_election_timeout()}
+ end;
+ Term == CurrentTerm andalso State == candidate ->
+ {next_state, follower, Data, {next_event, cast, Msg}};
+ Term == CurrentTerm andalso State == follower andalso LogOk ->
+ if
+ MEntries == [] ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => true,
+ matchIndex => MPrevLogIndex,
+ source => node()
+ },
+ couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+ cast(MSource, Reply, Data),
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ true ->
+ Index = MPrevLogIndex + 1,
+ LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+ if
+ LastLogIndex >= Index ->
+ NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+ FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+ if
+ NthLogTerm == FirstEntryTerm ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => true,
+ matchIndex => MPrevLogIndex + length(MEntries),
+ source => node()
+ },
+ couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+ cast(MSource, Reply, Data),
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ NthLogTerm /= FirstEntryTerm ->
+ couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+ {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ end;
+ LastLogIndex == MPrevLogIndex ->
+ couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+ {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ end
+ end
+ end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+ couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+ keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+ #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+ #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+ couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+ SourceNextIndex = maps:get(MSource, NextIndex),
+ if
+ MSuccess ->
+ {keep_state, Data#{
+ nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+ matchIndex => MatchIndex#{MSource => MMatchIndex}
+ }};
+ true ->
+ {keep_state, Data#{
+ nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+ }}
+ end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+ #{value := Value} = Msg,
+ #{term := Term, log := Log, froms := Froms} = Data,
+ EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+ Entry = {EntryIndex, Term, Value},
+ {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+ {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+ #{term := Term} = Data,
+ couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+ {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+ #{term := Term} = Data,
+ couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+ ok = send_append_entries(Data),
+ {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+ {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+ send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+ ok;
+send_append_entries([Peer | Rest], Data) ->
+ #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+ PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+ PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+ LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+ Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+ Msg = #{
+ type => 'AppendEntriesRequest',
+ term => Term,
+ source => node(),
+ prevLogIndex => PrevLogIndex,
+ prevLogTerm => PrevLogTerm,
+ entries => Entries,
+ commitIndex => min(CommitIndex, LastEntry)
+ },
+ cast(Peer, Msg, Data),
+ send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+ #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+ LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+ LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+ NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+ if
+ LastTerm == Term ->
+ update_state_machine(Data#{commitIndex => NewCommitIndex});
+ true ->
+ Data
+ end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+ Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+ #{log := Log, froms := Froms0, machine := Machine0} = Data,
+ From = LastApplied + 1,
+ To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+ Fun = fun(Index, {Froms, Machine}) ->
+ Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+ Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+ case maps:is_key(Index, Froms) of
+ true ->
+ gen_statem:reply(maps:get(Index, Froms), Result),
+ {maps:remove(Index, Froms), Result};
+ false ->
+ {Froms, Result}
+ end
+ end,
+ {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+ Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+ #{term := Term, cohort := Cohort, log := Log} = Data,
+ ElectionTerm = Term + 1,
+ couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+ RequestVote = #{
+ type => 'RequestVoteRequest',
+ term => ElectionTerm,
+ lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+ lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+ source => node()
+ },
+ lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+ Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+ gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+ {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+ {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+ Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 000000000..987212457
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+ new/0,
+ append/2,
+ sublist/3,
+ nth/2,
+ last/1,
+ index/1,
+ term/1,
+ value/1
+]).
+
+new() ->
+ [].
+
+append(Log, Items) ->
+ lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+ lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+ lists:nth(N, Log).
+
+last([]) ->
+ {0, 0, undefined};
+last(Log) ->
+ lists:last(Log).
+
+index(Entry) ->
+ element(1, Entry).
+
+term(Entry) ->
+ element(2, Entry).
+
+value(Entry) ->
+ element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 000000000..1c3f8ebc2
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+ N = 3,
+ Args = ["-pa", filename:dirname(code:which(craft))],
+ Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+ Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+ Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+ % wait for leader election
+ timer:sleep(500),
+
+ % verify only one leader elected
+ [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+ % make a series of calls
+ Hash1 = crypto:hash(sha256, <<0, 1>>),
+ ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+ Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+ ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+ Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+ ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+ % force a re-election
+ craft3:stop(FirstLeader),
+ timer:sleep(500),
+
+ % verify new leader elected
+ [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+ ?assertNotEqual(FirstLeader, SecondLeader),
+
+ % make another call
+ Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+ ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+ [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+ [peer:stop(Peer) || {ok, Peer} <- Peers].