You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2022/08/17 21:05:05 UTC
[couchdb] branch raft_storemodule updated (c8b859569 -> 725b2cdc1)
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a change to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
discard c8b859569 separate follower and candidate timeouts
discard eec4a4c0d clear votesGranted when unused for readability
discard cb64903b5 don't track matchIndex/nextIndex in non-leaders, pointless
discard dfda5e000 introduce store abstraction (WIP)
discard b0b12b6d2 Integrate raft algorithm (WIP)
add 0156029fe Display main build status in README
add 17d359b04 Merge pull request #4063 from apache/big-r81-patch-1
add 592837abc Add Erlang 25 support
add 6f85bac20 Merge pull request #4060 from apache/big-r81-add-erlang-25-support
add 687b4e023 Enable replicating purge requests between nodes
add 63e09832a Ignore repeated interactive purges
add 428d280e4 Update CI to remove Erlang < 23
add b4aadedcb Update rebar config to remove Erlang < 23 support
add eb9884e95 Skip Erlang version != 20 in configure
add 11eea624b Set low erlang version in Jenkins.pr to 23 instead of 20
add 0a229d8fb Remove Erlang < 23 ifdefs and other macros
add 1d2ba3e5e Update erlfmt formatting for smoosh_server
add cb80c634a Lowest CI image version Erlang 23 doesn't have spidermonkey 1.8.5
add 6da9405ff Erlang 25 compatibility - http_uri:parse -> uri_string:parse
add 1bb3da87c Fix deprecated function warning
add 09737dfd6 Delete unused include_lib
add 507932ca3 Replace include with include_lib
add 19d2f9b69 Fix "variable bound multiple times" warning
add 52ef641f1 Fix full CI build
add 211509e58 Bypass macos builder for now while we sort the rebar issue
add 116c4e956 Fix couch_debug:opened_files* functions
add c417303ef Prevent error:function_clause in check_security/3 if roles claim is malformed (#4070)
add 47866dea3 - rename master to main - fixing links
add f164b2dd9 Adding commit message conventions and update links
add ab00be437 Use https as default protocol for links
add 0bfbb63e4 Backport commits from fdbmain into main (old 3.x)
add adf17140e Merge pull request #4079 from apache/backport-from-fdbmain
add 23b352d76 Small url fixes
add b424ad12a Merge pull request #4080 from apache/change-irc-url
add c605e0458 Fix Elixir 13 compatibility
add 2c351d62c Update vm.args for Erlang 23+
add ea5df65c5 Bring back POWER full builds
add eb2f8d998 Add Erlang 25 to PR CI pipeline and Ubuntu Jammy to full CI
add e41465ec8 Add an option to let custodian always use [cluster] n value
add 29ac7853f Optimize couch_util:to_hex/1
add 6a455c74b Implement winning_revs_only option for the replicator
add eb0b28a70 Fix flaky "validate doc update" elixir test
add 74017fd5d Skip uploading build logs for now
add 4fab0509d Skip nightly package uploads since nothing seems to be using them
add 5eef3fff5 Improve error handling in smoosh_utils:write_to_file/3
add 22f0b44ef Merge pull request #4093 from noahshaw11/fix-error-handling-smoosh
add b749b219b Add filepath to is_compacting
add 330703cae Remove some left-over local endpoint clauses in replicator
add 02c0c75c2 Clean up unused code and invalid spec from replicator
add 76dd66f40 Remove view compaction jobs recovery
add 005843a43 Fix not calling is_compacting test
add d0fd91529 Fix not_found error smoosh
add 7fb96d265 Add toggle for smoosh queue persistence
add daff65d8c Replace SHA-1 with SHA-256 for cookie authentication (#4094)
add 42be159c7 Trim X-Auth-CouchDB-Roles header after reading
add 9965289f2 Update elixir to 1.13
add c71239bf0 Update application description and dependencies
add ebbcc7ec2 Fix the flaky tests for `create_doc()`
add b3586f1f5 Fix stats endpoint
add 8c99dc530 make haproxy config valid again
add f4ff8aa12 Merge pull request #4123 from apache/dev-run-fix-haproxy-cfg
add a431b930f Turn document update mode atoms into defines
add 35b30385a Return a 400 response for a single new_edits=false doc update without revisions
add 419447cd1 Remove `couch_tests`
add 02ca8c62c Merge pull request #4125 from jiahuili430/couch-tests
add 3527d3047 Revert "Replace SHA-1 with SHA-256 for cookie authentication (#4094)"
add fff03ef8e Merge pull request #4128 from apache/revert-4094-for-now
add 963daf6ca Implement view_report function
add a45e82aa1 Merge pull request #4033 from noahshaw11/implement-view_report-function
add 2be1da823 Add io_priority classes
add c09cd8968 Add ioq io_priority functions and system class
add 74f12c74d Merge pull request #4106 from apache/4101-add-io-priority
add deef12eff Add ioq:call_search
add 7f1a33169 Merge pull request #4135 from apache/dedicated-ioq-search-function
add 1f1c56d5d Fix elixir :logger warnings
add 90f20c849 Add editors magic lines
add cfed4bb07 Merge pull request #4133 from noahshaw11/add-editors-magic-lines
add f8dad2fe6 Fix proxyauth_test and remove it from skipping tests (#4129)
add 96214780e Fix purge request timeouts
add 6c57f58dd Update couchdb-config to 2.2.0
add 9a5d0de97 Update couchdb-mochiweb to v3.1.0
new 5be100df2 Integrate raft algorithm (WIP)
new f2981b8ea introduce store abstraction (WIP)
new 7e5fc6439 don't track matchIndex/nextIndex in non-leaders, pointless
new 0b440b86f clear votesGranted when unused for readability
new 725b2cdc1 separate follower and candidate timeouts
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (c8b859569)
\
N -- N -- N refs/heads/raft_storemodule (725b2cdc1)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.github/ISSUE_TEMPLATE/rfc.md | 2 +-
.gitignore | 3 +
CONTRIBUTING.md | 132 ++++---
Makefile | 2 +-
README.rst | 18 +-
build-aux/Jenkinsfile.full | 78 ++--
build-aux/Jenkinsfile.pr | 44 +--
build-aux/logfile-uploader.py | 1 +
config/config.exs | 4 +-
config/dev.exs | 2 +-
config/integration.exs | 6 +-
config/prod.exs | 2 +-
config/test.exs | 6 +-
configure | 4 +-
dev/run | 3 +-
mix.exs | 23 +-
mix.lock | 19 -
rebar.config.script | 21 +-
rel/overlay/etc/default.ini | 23 +-
rel/overlay/etc/local.ini | 2 +-
rel/overlay/etc/vm.args | 11 +-
src/chttpd/rebar.config | 2 +
src/chttpd/src/chttpd.app.src | 7 +-
src/chttpd/src/chttpd.erl | 52 +--
src/chttpd/src/chttpd_auth_cache.erl | 6 +-
src/chttpd/src/chttpd_db.erl | 14 +-
src/chttpd/src/chttpd_node.erl | 21 +-
src/chttpd/src/chttpd_stats.erl | 7 +-
src/chttpd/src/chttpd_sup.erl | 2 +-
src/chttpd/test/eunit/chttpd_db_test.erl | 56 +++
.../test/eunit/chttpd_open_revs_error_test.erl | 2 +-
src/couch/include/couch_db.hrl | 26 +-
src/couch/rebar.config.script | 2 +
src/couch/src/couch.app.src | 7 +-
src/couch/src/couch_app.erl | 2 -
src/couch/src/couch_bt_engine.erl | 4 +-
src/couch/src/couch_db.erl | 83 +++--
src/couch/src/couch_db_engine.erl | 4 +-
src/couch/src/couch_db_updater.erl | 2 +-
src/couch/src/couch_debug.erl | 138 +++++---
src/couch/src/couch_doc.erl | 2 +-
src/couch/src/couch_httpd.erl | 239 +++++++------
src/couch/src/couch_httpd_auth.erl | 8 +-
src/couch/src/couch_httpd_db.erl | 8 +-
src/couch/src/couch_httpd_vhost.erl | 2 +-
src/couch/src/couch_key_tree.erl | 4 +-
src/couch/src/couch_passwords.erl | 4 +-
src/couch/src/couch_proc_manager.erl | 10 +-
src/couch/src/couch_query_servers.erl | 9 +-
src/couch/src/couch_server.erl | 6 +-
src/couch/src/couch_util.erl | 112 +++---
src/couch/src/couch_uuids.erl | 2 +-
src/couch/src/test_util.erl | 4 +-
src/couch/test/eunit/couch_auth_cache_tests.erl | 2 +-
.../test/eunit/couch_bt_engine_compactor_tests.erl | 35 +-
src/couch/test/eunit/couch_db_plugin_tests.erl | 16 +-
src/couch/test/eunit/couch_js_tests.erl | 2 +-
src/couch/test/eunit/couch_key_tree_tests.erl | 2 +-
src/couch/test/eunit/couch_util_tests.erl | 43 +++
src/couch/test/eunit/couch_uuids_tests.erl | 4 +-
.../test/eunit/couchdb_update_conflicts_tests.erl | 4 +-
src/couch_dist/rebar.config | 2 +
src/couch_epi/rebar.config | 2 +
src/couch_epi/src/couch_epi.app.src.script | 24 +-
src/couch_epi/test/eunit/couch_epi_tests.erl | 2 +-
src/couch_event/rebar.config | 2 +
src/couch_index/rebar.config | 2 +
src/couch_index/src/couch_index.app.src | 2 +-
src/couch_index/src/couch_index.erl | 1 +
src/couch_index/src/couch_index_util.erl | 2 +-
src/couch_log/rebar.config | 2 +
src/couch_log/src/couch_log_monitor.erl | 12 -
src/couch_log/src/couch_log_trunc_io_fmt.erl | 2 +-
.../test/eunit/couch_log_formatter_test.erl | 4 +-
src/couch_mrview/rebar.config | 2 +
src/couch_mrview/src/couch_mrview_debug.erl | 391 ++++++++++++++++++++-
src/couch_mrview/src/couch_mrview_http.erl | 2 +-
.../test/eunit/couch_mrview_purge_docs_tests.erl | 6 +-
src/couch_peruser/src/couch_peruser.app.src | 2 +-
src/couch_peruser/src/couch_peruser.erl | 2 +
src/couch_prometheus/src/couch_prometheus_util.erl | 2 +-
src/couch_pse_tests/src/cpse_test_purge_docs.erl | 21 +-
src/couch_pse_tests/src/cpse_test_purge_seqs.erl | 2 +-
src/couch_pse_tests/src/cpse_util.erl | 6 +-
src/couch_replicator/src/couch_replicator.app.src | 3 +-
.../src/couch_replicator_api_wrap.erl | 55 +--
src/couch_replicator/src/couch_replicator_auth.erl | 2 +-
.../src/couch_replicator_auth_session.erl | 15 +-
.../src/couch_replicator_changes_reader.erl | 14 +-
.../src/couch_replicator_doc_processor_worker.erl | 2 +-
src/couch_replicator/src/couch_replicator_docs.erl | 18 +-
.../src/couch_replicator_httpc.erl | 3 +-
src/couch_replicator/src/couch_replicator_ids.erl | 48 ++-
.../src/couch_replicator_js_functions.hrl | 6 +
.../src/couch_replicator_scheduler.erl | 2 +-
.../src/couch_replicator_scheduler_job.erl | 93 +++--
.../src/couch_replicator_utils.erl | 6 +-
.../src/couch_replicator_worker.erl | 7 +-
src/couch_replicator/src/json_stream_parse.erl | 2 +-
.../couch_replicator_error_reporting_tests.erl | 2 +-
.../eunit/couch_replicator_many_leaves_tests.erl | 134 ++++---
src/couch_tests/rebar.config | 2 +
src/custodian/rebar.config.script | 2 +
src/custodian/src/custodian_util.erl | 9 +-
src/ddoc_cache/src/ddoc_cache_entry.erl | 5 +-
src/dreyfus/src/clouseau_rpc.erl | 2 +-
src/dreyfus/test/elixir/test/test_helper.exs | 2 +-
src/fabric/rebar.config | 2 +
src/fabric/src/fabric_doc_open.erl | 2 +-
src/fabric/src/fabric_doc_open_revs.erl | 2 +-
src/fabric/src/fabric_doc_purge.erl | 2 +-
src/fabric/src/fabric_doc_update.erl | 4 +-
src/fabric/src/fabric_rpc.erl | 42 ++-
src/fabric/src/fabric_util.erl | 14 -
src/fabric/src/fabric_view_all_docs.erl | 10 +-
src/fabric/test/eunit/fabric_db_create_tests.erl | 4 +-
src/global_changes/src/global_changes.app.src | 3 +-
src/global_changes/src/global_changes_server.erl | 5 +-
src/ioq/src/ioq.erl | 24 +-
src/jwtf/rebar.config | 2 +
src/jwtf/src/jwtf.erl | 23 --
src/ken/rebar.config.script | 2 +
src/ken/src/ken.app.src.script | 17 +-
src/mango/rebar.config.script | 2 +
src/mango/src/mango_cursor_view.erl | 2 +-
src/mango/src/mango_httpd.erl | 10 +-
src/mango/src/mango_selector.erl | 4 +-
src/mango/src/mango_selector_text.erl | 2 +-
src/mango/src/mango_util.erl | 6 +-
src/mem3/rebar.config | 2 +
src/mem3/rebar.config.script | 2 +
src/mem3/src/mem3.app.src | 3 +-
src/mem3/src/mem3.erl | 4 +-
src/mem3/src/mem3_bdu.erl | 2 +-
src/mem3/src/mem3_nodes.erl | 7 +-
src/mem3/src/mem3_rep.erl | 8 +-
src/mem3/src/mem3_reshard_httpd.erl | 4 +-
src/mem3/src/mem3_shards.erl | 6 +-
src/mem3/src/mem3_util.erl | 11 +-
src/rexi/rebar.config | 2 +
src/rexi/src/rexi_server.erl | 45 ++-
src/setup/src/setup.app.src | 29 +-
src/smoosh/rebar.config | 2 +
src/smoosh/src/smoosh_channel.erl | 87 +++--
src/smoosh/src/smoosh_priority_queue.erl | 2 +-
src/smoosh/src/smoosh_server.erl | 41 ++-
src/smoosh/src/smoosh_utils.erl | 61 +++-
src/smoosh/test/smoosh_tests.erl | 75 ++--
src/weatherreport/rebar.config | 2 +
src/weatherreport/src/weatherreport.app.src | 5 +-
test/elixir/README.md | 2 +-
test/elixir/config/config.exs | 2 +-
test/elixir/config/test.exs | 4 +-
test/elixir/lib/couch/{db_test.ex => dbtest.ex} | 0
test/elixir/lib/step/start.ex | 4 +-
test/elixir/lib/suite.ex | 2 +-
test/elixir/test/bulk_docs_test.exs | 4 +-
test/elixir/test/changes_test.exs | 4 +-
test/elixir/test/config/skip.elixir | 4 -
test/elixir/test/config/suite.elixir | 8 +-
test/elixir/test/cookie_auth_test.exs | 2 +-
test/elixir/test/design_docs_test.exs | 16 +-
test/elixir/test/proxyauth_test.exs | 95 ++---
163 files changed, 1918 insertions(+), 1103 deletions(-)
delete mode 100644 mix.lock
rename test/elixir/lib/couch/{db_test.ex => dbtest.ex} (100%)
[couchdb] 01/05: Integrate raft algorithm (WIP)
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5be100df24446dff44cb7eeeaa31f89eb0a9db75
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat May 14 20:28:05 2022 +0100
Integrate raft algorithm (WIP)
couch_raft.erl is a complete implementation of the raft algorithm but
currently only manages an in-memory state machine and log.
Preliminary work is also here to add a new btree inside the `.couch`
files, which will be the real raft log. The intent is that log entries
can be removed from this log and applied to by_id and by_seq trees
atomically.
raft log is not preserved over compaction yet because reading the
compactor code hurts my eyes.
Anyway, it's progress and hopefully we're going somewhere cool.
---
src/couch/src/couch_bt_engine.erl | 95 +++++++-
src/couch/src/couch_bt_engine.hrl | 3 +-
src/couch/src/couch_bt_engine_header.erl | 7 +-
src/couch/src/couch_db.erl | 19 ++
src/couch/src/couch_db_engine.erl | 27 +++
src/couch/src/couch_db_updater.erl | 10 +
src/couch/src/couch_raft.erl | 350 ++++++++++++++++++++++++++++++
src/couch/src/couch_raft_log.erl | 52 +++++
src/couch/test/eunit/couch_raft_SUITE.erl | 67 ++++++
9 files changed, 621 insertions(+), 9 deletions(-)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 0549de566..8c1a2756d 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
purge_docs/3,
copy_purge_infos/2,
+ raft_lookup/2,
+ raft_insert/2,
+ raft_discard/2,
+ raft_last/1,
+
commit_data/1,
open_write_stream/2,
@@ -102,7 +107,11 @@
purge_tree_join/2,
purge_tree_reduce/2,
purge_seq_tree_split/1,
- purge_seq_tree_join/2
+ purge_seq_tree_join/2,
+
+ raft_tree_split/1,
+ raft_tree_join/2,
+ raft_tree_reduce/2
]).
% Used by the compactor
@@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) ->
{ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
Changes.
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+ #st{
+ raft_tree = RaftTree0
+ } = St,
+ {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+ {ok, St#st{
+ raft_tree = RaftTree1,
+ needs_commit = true
+ }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+ Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+ lists:map(
+ fun
+ ({ok, Entry}) -> Entry;
+ (not_found) -> not_found
+ end,
+ Results
+ ).
+
+raft_discard(#st{} = St, UpTo) ->
+ #st{
+ raft_tree = RaftTree0
+ } = St,
+ {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+ {FirstIndex, _FirstTerm} = First,
+ Remove = lists:seq(FirstIndex, UpTo),
+ {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+ {ok, St#st{
+ raft_tree = RaftTree1,
+ needs_commit = true
+ }}.
+
+
+raft_last(#st{} = St) ->
+ {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+ Last.
+
start_compaction(St, DbName, Options, Parent) ->
Args = [St, DbName, Options, Parent],
Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) ->
purge_tree_reduce(rereduce, Reds) ->
lists:sum(Reds).
+raft_tree_split({Index, Term, Value}) ->
+ {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+ {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+ {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+ {MinIndex, MinTerm, _} = lists:min(Entries),
+ {MaxIndex, MaxTerm, _} = lists:max(Entries),
+ {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+ {Mins, Maxs} = lists:unzip(Reds),
+ {lists:min(Mins), lists:max(Maxs)}.
+
set_update_seq(#st{header = Header} = St, UpdateSeq) ->
{ok, St#st{
header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) ->
{reduce, fun ?MODULE:purge_tree_reduce/2}
]),
+ RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+ {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+ {split, fun ?MODULE:raft_tree_split/1},
+ {join, fun ?MODULE:raft_tree_join/2},
+ {reduce, fun ?MODULE:raft_tree_reduce/2}
+ ]),
+
ok = couch_file:set_db_pid(Fd, self()),
St = #st{
@@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) ->
local_tree = LocalTree,
compression = Compression,
purge_tree = PurgeTree,
- purge_seq_tree = PurgeSeqTree
+ purge_seq_tree = PurgeSeqTree,
+ raft_tree = RaftTree
},
% If this is a new database we've just created a
@@ -927,7 +999,8 @@ update_header(St, Header) ->
{id_tree_state, couch_btree:get_state(St#st.id_tree)},
{local_tree_state, couch_btree:get_state(St#st.local_tree)},
{purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
- {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+ {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+ {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
]).
increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) ->
St#st.seq_tree,
St#st.local_tree,
St#st.purge_tree,
- St#st.purge_seq_tree
+ St#st.purge_seq_tree,
+ St#st.raft_tree
],
lists:foldl(
fun(T, Acc) ->
@@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) ->
finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
#st{
filepath = FilePath,
- local_tree = OldLocal
+ local_tree = OldLocal,
+ raft_tree = OldRaft
} = OldSt,
#st{
filepath = CompactDataPath,
header = Header,
- local_tree = NewLocal1
+ local_tree = NewLocal1,
+ raft_tree = NewRaft1
} = NewSt1,
% suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
{ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
{ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
+ % do the same for the raft log
+ {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+ {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
{ok, NewSt2} = commit_data(NewSt1#st{
header = couch_bt_engine_header:set(Header, [
{compacted_seq, get_update_seq(OldSt)},
{revs_limit, get_revs_limit(OldSt)},
{purge_infos_limit, get_purge_infos_limit(OldSt)}
]),
- local_tree = NewLocal2
+ local_tree = NewLocal2,
+ raft_tree = NewRaft2
}),
% Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d4983..0d347e99b 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
local_tree,
compression,
purge_tree,
- purge_seq_tree
+ purge_seq_tree,
+ raft_tree
}).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f07723..9e663b096 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
purge_tree_state/1,
purge_seq_tree_state/1,
purge_infos_limit/1,
+ raft_tree_state/1,
security_ptr/1,
revs_limit/1,
uuid/1,
@@ -69,7 +70,8 @@
epochs,
compacted_seq,
purge_infos_limit = 1000,
- props_ptr
+ props_ptr,
+ raft_tree_state = nil
}).
-define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@ compacted_seq(Header) ->
purge_infos_limit(Header) ->
get_field(Header, purge_infos_limit).
+raft_tree_state(Header) ->
+ get_field(Header, raft_tree_state).
+
get_field(Header, Field) ->
get_field(Header, Field, undefined).
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index dd7e07517..e197f98a4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
fold_purge_infos/4,
fold_purge_infos/5,
+ raft_insert/2,
+ raft_lookup/2,
+ raft_discard/2,
+ raft_last/1,
+
calculate_start_seq/3,
owner_of/2,
@@ -1822,6 +1827,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+ couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+ couch_db_engine:raft_last(Db).
+
count_changes_since(Db, SinceSeq) ->
couch_db_engine:count_changes_since(Db, SinceSeq).
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 052a527e3..63b9d49a3 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
read_doc_body/2,
load_purge_infos/2,
+ raft_lookup/2,
+ raft_insert/2,
+ raft_discard/2,
+ raft_last/1,
+
serialize_doc/2,
write_doc_body/2,
write_doc_infos/3,
@@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) ->
),
{ok, Db#db{engine = {Engine, NewSt}}}.
+raft_insert(#db{} = Db, Entries) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:raft_insert(
+ EngineState, Entries
+ ),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:raft_discard(
+ EngineState, UpTo
+ ),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:raft_last(EngineState).
+
commit_data(#db{} = Db) ->
#db{engine = {Engine, EngineState}} = Db,
{ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 0248c21ec..7c1f97804 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
end,
{ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
{reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+ Db3 = commit_data(Db2),
+ ok = couch_server:db_updated(Db3),
+ {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+ Db3 = commit_data(Db2),
+ ok = couch_server:db_updated(Db3),
+ {reply, ok, Db3, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
{reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 000000000..f398b4f2a
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+ start/2,
+ start_link/2,
+ stop/1,
+ call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+ init/1,
+ callback_mode/0,
+ handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+ gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+ gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+ Peers = peers(Cohort),
+ #{
+ name => Name,
+ cohort => Cohort,
+ term => 0,
+ votedFor => undefined,
+ votesGranted => #{},
+ nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+ matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+ log => couch_raft_log:new(),
+ commitIndex => 0,
+ froms => #{},
+ lastApplied => 0,
+ machine => <<0>>
+ }.
+
+stop(ServerRef) ->
+ gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+ gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+ {ok, follower, Data}.
+
+callback_mode() ->
+ [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+ couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+ {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+ #{term := Term, froms := Froms} = Data,
+ couch_log:notice("~p became follower in term ~B", [node(), Term]),
+ Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+ {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+ #{term := Term} = Data,
+ couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+ {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+ #{log := Log, cohort := Cohort, term := Term} = Data,
+ couch_log:notice("~p became leader in term ~B", [node(), Term]),
+ Peers = peers(Cohort),
+ {keep_state, Data#{
+ nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+ matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+ }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+ when Term =< CurrentTerm ->
+ #{
+ source := MSource,
+ lastLogIndex := MLastLogIndex,
+ lastLogTerm := MLastLogTerm
+ } = Msg,
+ #{
+ log := Log,
+ votedFor := VotedFor
+ } = Data,
+ LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+ Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+ couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+ Reply = #{
+ type => 'RequestVoteResponse',
+ term => CurrentTerm,
+ voteGranted => Grant,
+ source => node()
+ },
+ cast(MSource, Reply, Data),
+ if
+ Grant ->
+ {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+ true ->
+ {keep_state_and_data, restart_election_timeout()}
+ end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+ couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+ keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+ #{source := MSource, voteGranted := MVoteGranted} = Msg,
+ #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+ VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+ couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+ if
+ length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+ couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+ {next_state, leader, Data#{votesGranted => VotesGranted1}};
+ true ->
+ {keep_state, Data#{votesGranted => VotesGranted1}}
+ end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+ when Term =< CurrentTerm ->
+ #{
+ source := MSource,
+ prevLogIndex := MPrevLogIndex,
+ prevLogTerm := MPrevLogTerm,
+ entries := MEntries,
+ commitIndex := MCommitIndex
+ } = Msg,
+ #{
+ log := Log
+ } = Data,
+ LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+ if
+ Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => false,
+ matchIndex => 0,
+ source => node()
+ },
+ cast(MSource, Reply, Data),
+ if
+ State == leader ->
+ keep_state_and_data;
+ true ->
+ {keep_state_and_data, restart_election_timeout()}
+ end;
+ Term == CurrentTerm andalso State == candidate ->
+ {next_state, follower, Data, {next_event, cast, Msg}};
+ Term == CurrentTerm andalso State == follower andalso LogOk ->
+ if
+ MEntries == [] ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => true,
+ matchIndex => MPrevLogIndex,
+ source => node()
+ },
+ couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+ cast(MSource, Reply, Data),
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ true ->
+ Index = MPrevLogIndex + 1,
+ LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+ if
+ LastLogIndex >= Index ->
+ NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+ FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+ if
+ NthLogTerm == FirstEntryTerm ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => true,
+ matchIndex => MPrevLogIndex + length(MEntries),
+ source => node()
+ },
+ couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+ cast(MSource, Reply, Data),
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ NthLogTerm /= FirstEntryTerm ->
+ couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+ {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ end;
+ LastLogIndex == MPrevLogIndex ->
+ couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+ {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ end
+ end
+ end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+ couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+ keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+ #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+ #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+ couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+ SourceNextIndex = maps:get(MSource, NextIndex),
+ if
+ MSuccess ->
+ {keep_state, Data#{
+ nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+ matchIndex => MatchIndex#{MSource => MMatchIndex}
+ }};
+ true ->
+ {keep_state, Data#{
+ nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+ }}
+ end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+ #{value := Value} = Msg,
+ #{term := Term, log := Log, froms := Froms} = Data,
+ EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+ Entry = {EntryIndex, Term, Value},
+ {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+ {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+ #{term := Term} = Data,
+ couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+ {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+ #{term := Term} = Data,
+ couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+ ok = send_append_entries(Data),
+ {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+ {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+ send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+ ok;
+send_append_entries([Peer | Rest], Data) ->
+ #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+ PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+ PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+ LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+ Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+ Msg = #{
+ type => 'AppendEntriesRequest',
+ term => Term,
+ source => node(),
+ prevLogIndex => PrevLogIndex,
+ prevLogTerm => PrevLogTerm,
+ entries => Entries,
+ commitIndex => min(CommitIndex, LastEntry)
+ },
+ cast(Peer, Msg, Data),
+ send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+ #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+ LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+ LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+ NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+ if
+ LastTerm == Term ->
+ update_state_machine(Data#{commitIndex => NewCommitIndex});
+ true ->
+ Data
+ end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+ Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+ #{log := Log, froms := Froms0, machine := Machine0} = Data,
+ From = LastApplied + 1,
+ To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+ Fun = fun(Index, {Froms, Machine}) ->
+ Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+ Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+ case maps:is_key(Index, Froms) of
+ true ->
+ gen_statem:reply(maps:get(Index, Froms), Result),
+ {maps:remove(Index, Froms), Result};
+ false ->
+ {Froms, Result}
+ end
+ end,
+ {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+ Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+ #{term := Term, cohort := Cohort, log := Log} = Data,
+ ElectionTerm = Term + 1,
+ couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+ RequestVote = #{
+ type => 'RequestVoteRequest',
+ term => ElectionTerm,
+ lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+ lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+ source => node()
+ },
+ lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+ Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+ gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+ {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+ {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+ Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 000000000..987212457
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+ new/0,
+ append/2,
+ sublist/3,
+ nth/2,
+ last/1,
+ index/1,
+ term/1,
+ value/1
+]).
+
+new() ->
+ [].
+
+append(Log, Items) ->
+ lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+ lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+ lists:nth(N, Log).
+
+last([]) ->
+ {0, 0, undefined};
+last(Log) ->
+ lists:last(Log).
+
+index(Entry) ->
+ element(1, Entry).
+
+term(Entry) ->
+ element(2, Entry).
+
+value(Entry) ->
+ element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 000000000..1c3f8ebc2
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+ N = 3,
+ Args = ["-pa", filename:dirname(code:which(craft))],
+ Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+ Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+ Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+ % wait for leader election
+ timer:sleep(500),
+
+ % verify only one leader elected
+ [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+ % make a series of calls
+ Hash1 = crypto:hash(sha256, <<0, 1>>),
+ ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+ Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+ ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+ Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+ ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+ % force a re-election
+ craft3:stop(FirstLeader),
+ timer:sleep(500),
+
+ % verify new leader elected
+ [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+ ?assertNotEqual(FirstLeader, SecondLeader),
+
+ % make another call
+ Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+ ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+ [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+ [peer:stop(Peer) || {ok, Peer} <- Peers].
[couchdb] 05/05: separate follower and candidate timeouts
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 725b2cdc1bff165ec4cfb4512440edaee575a470
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 17 21:57:53 2022 +0100
separate follower and candidate timeouts
From;
ARC: Analysis of Raft Consensus - 4.2
"As the authors use the same timer range for candidates and followers,
in Figure 4.1 we are waiting a minimum of 150ms (and up to twice that)
before restarting an election, despite the fact that, on average, a
node receives all of its responses within 15ms"
We separate the timeouts and set the candidate timeout smaller than
the follower timeout. In a contested election (where multiple
candidates each gain a minority of votes) we should elect a leader
faster than otherwise.
---
src/couch/src/couch_raft.erl | 41 +++++++++++++++++++++--------------------
1 file changed, 21 insertions(+), 20 deletions(-)
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index 06025784e..98fb6f926 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -15,9 +15,6 @@
-module(couch_raft).
-behaviour(gen_statem).
--define(ELECTION_DELAY, 150).
--define(ELECTION_SPLAY, 150).
--define(LEADER_HEARTBEAT, 75).
-define(CLIENT_TIMEOUT, 5_000).
% maximum number of entries to send in one go.
@@ -78,12 +75,12 @@ handle_event(enter, _OldState, follower, Data) ->
couch_log:notice("~p became follower in term ~B", [node(), Term]),
Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
- [restart_election_timeout() | Replies]});
+ [state_timeout(follower) | Replies]});
handle_event(enter, _OldState, candidate, Data) ->
#{term := Term} = Data,
couch_log:notice("~p became candidate in term ~B", [node(), Term]),
- persist({keep_state, start_election(Data), restart_election_timeout()});
+ persist({keep_state, start_election(Data), state_timeout(candidate)});
handle_event(enter, _OldState, leader, Data) ->
#{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
@@ -93,9 +90,9 @@ handle_event(enter, _OldState, leader, Data) ->
{keep_state, Data#{
nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
- }, restart_heartbeat_timeout()};
+ }, state_timeout(leader)};
-handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
when Term =< CurrentTerm ->
#{
source := MSource,
@@ -119,9 +116,9 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
cast(MSource, Reply, Data),
if
Grant ->
- persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
+ persist({keep_state, Data#{votedFor => MSource}, state_timeout(State)});
true ->
- {keep_state_and_data, restart_election_timeout()}
+ {keep_state_and_data, state_timeout(State)}
end;
handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
@@ -171,7 +168,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
State == leader ->
keep_state_and_data;
true ->
- {keep_state_and_data, restart_election_timeout()}
+ {keep_state_and_data, state_timeout(State)}
end;
Term == CurrentTerm andalso State == candidate ->
{next_state, follower, Data, {next_event, cast, Msg}};
@@ -187,7 +184,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
},
couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
cast(MSource, Reply, Data),
- {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
true ->
Index = MPrevLogIndex + 1,
if
@@ -205,12 +202,12 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
},
couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
cast(MSource, Reply, Data),
- {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), state_timeout(State)};
NthLogTerm /= FirstEntryTerm ->
couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
case StoreModule:truncate(LastIndex - 1, Data) of
{ok, NewData} ->
- {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+ {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
{error, Reason} ->
{stop, Reason}
end
@@ -219,7 +216,7 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
case StoreModule:append(MEntries, Data) of
{ok, _EntryIndex, NewData} ->
- {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+ {keep_state, NewData, [{next_event, cast, Msg}, state_timeout(State)]};
{error, Reason} ->
{stop, Reason}
end
@@ -268,13 +265,13 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
#{term := Term} = Data,
couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
- persist({next_state, candidate, start_election(Data), restart_election_timeout()});
+ persist({next_state, candidate, start_election(Data), state_timeout(State)});
handle_event(state_timeout, heartbeat, leader, Data) ->
#{term := Term} = Data,
couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
ok = send_append_entries(Data),
- {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+ {keep_state, advance_commit_index(Data), state_timeout(leader)};
handle_event(EventType, EventContent, State, Data) ->
{stop, {unknown_event, EventType, EventContent, State, Data}}.
@@ -360,11 +357,15 @@ start_election(Data) ->
cast(Node, Msg, #{name := Name}) ->
gen_statem:cast({Name, Node}, Msg).
-restart_election_timeout() ->
- {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
-restart_heartbeat_timeout() ->
- {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+state_timeout(follower) ->
+ {state_timeout, 150 + rand:uniform(150), new_election};
+
+state_timeout(candidate) ->
+ {state_timeout, 15 + rand:uniform(15), new_election};
+
+state_timeout(leader) ->
+ {state_timeout, 75, heartbeat}.
peers(Cohort) ->
Cohort -- [node()].
[couchdb] 02/05: introduce store abstraction (WIP)
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f2981b8ea032a7a565205dc97f68c85f9533a387
Author: Robert Newson <rn...@apache.org>
AuthorDate: Sat Jun 18 10:27:41 2022 +0100
introduce store abstraction (WIP)
---
Makefile | 10 ++
src/couch/src/couch_bt_engine.erl | 2 -
src/couch/src/couch_raft.erl | 154 +++++++++++++++---------
src/couch/src/couch_raft_log.erl | 52 --------
src/couch/src/couch_raft_store.erl | 35 ++++++
src/couch/src/couch_raft_store_sha256.erl | 80 ++++++++++++
src/couch/test/{eunit => }/couch_raft_SUITE.erl | 40 ++++--
7 files changed, 246 insertions(+), 127 deletions(-)
diff --git a/Makefile b/Makefile
index 82c2b335b..e7a389469 100644
--- a/Makefile
+++ b/Makefile
@@ -176,6 +176,16 @@ eunit: couch
COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
done
+.PHONY: ct
+ct: export BUILDDIR = $(shell pwd)
+ct: export ERL_AFLAGS = -config $(shell pwd)/rel/files/eunit.config
+ct: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell pwd)/share/server/main.js
+ct: export COUCHDB_TEST_ADMIN_PARTY_OVERRIDE=1
+ct: couch
+ @COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) setup_eunit 2> /dev/null
+ @for dir in $(subdirs); do \
+ COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r ct $(EUNIT_OPTS) apps=$$dir || exit 1; \
+ done
.PHONY: exunit
# target: exunit - Run ExUnit tests
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 8c1a2756d..d93071c1e 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -673,7 +673,6 @@ raft_discard(#st{} = St, UpTo) ->
needs_commit = true
}}.
-
raft_last(#st{} = St) ->
{ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
Last.
@@ -852,7 +851,6 @@ raft_tree_split({Index, Term, Value}) ->
raft_tree_join(Index, {Term, Value}) ->
{Index, Term, Value}.
-
raft_tree_reduce(reduce, []) ->
{{0, 0}, {0, 0}};
raft_tree_reduce(reduce, Entries) ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index f398b4f2a..fda19cc22 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -26,8 +26,8 @@
% public api
-export([
- start/2,
- start_link/2,
+ start/3,
+ start_link/3,
stop/1,
call/2
]).
@@ -42,28 +42,23 @@
%% public api
-start(Name, Cohort) ->
- gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+start(Name, StoreModule, StoreState) ->
+ gen_statem:start({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
-start_link(Name, Cohort) ->
- gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+start_link(Name, StoreModule, StoreState) ->
+ gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
-new(Name, Cohort) ->
+new(Name, StoreModule, StoreState) ->
+ #{cohort := Cohort} = StoreState,
Peers = peers(Cohort),
- #{
+ maps:merge(#{
name => Name,
- cohort => Cohort,
- term => 0,
- votedFor => undefined,
+ store_module => StoreModule,
votesGranted => #{},
nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
- log => couch_raft_log:new(),
- commitIndex => 0,
- froms => #{},
- lastApplied => 0,
- machine => <<0>>
- }.
+ froms => #{}
+ }, StoreState).
stop(ServerRef) ->
gen_statem:stop(ServerRef).
@@ -80,25 +75,26 @@ callback_mode() ->
%% erlfmt-ignore
handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
- {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+ persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
handle_event(enter, _OldState, follower, Data) ->
#{term := Term, froms := Froms} = Data,
couch_log:notice("~p became follower in term ~B", [node(), Term]),
Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
- {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+ persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
handle_event(enter, _OldState, candidate, Data) ->
#{term := Term} = Data,
couch_log:notice("~p became candidate in term ~B", [node(), Term]),
- {keep_state, start_election(Data), restart_election_timeout()};
+ persist({keep_state, start_election(Data), restart_election_timeout()});
handle_event(enter, _OldState, leader, Data) ->
- #{log := Log, cohort := Cohort, term := Term} = Data,
+ #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
couch_log:notice("~p became leader in term ~B", [node(), Term]),
Peers = peers(Cohort),
+ {LastIndex, _} = StoreModule:last(Data),
{keep_state, Data#{
- nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+ nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
}, restart_heartbeat_timeout()};
@@ -110,10 +106,11 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
lastLogTerm := MLastLogTerm
} = Msg,
#{
- log := Log,
+ store_module := StoreModule,
votedFor := VotedFor
} = Data,
- LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+ {LastIndex, LastTerm} = StoreModule:last(Data),
+ LogOk = MLastLogTerm > LastTerm orelse (MLastLogTerm == LastTerm andalso MLastLogIndex >= LastIndex),
Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
Reply = #{
@@ -125,7 +122,7 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
cast(MSource, Reply, Data),
if
Grant ->
- {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+ persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
true ->
{keep_state_and_data, restart_election_timeout()}
end;
@@ -158,9 +155,11 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
commitIndex := MCommitIndex
} = Msg,
#{
- log := Log
+ store_module := StoreModule
} = Data,
- LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+ {LastIndex, _LastTerm} = StoreModule:last(Data),
+ {NthTerm, _} = StoreModule:lookup(MPrevLogIndex, Data),
+ LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == NthTerm),
if
Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
Reply = #{
@@ -194,11 +193,10 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
{keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
true ->
Index = MPrevLogIndex + 1,
- LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
if
- LastLogIndex >= Index ->
- NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
- FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+ LastIndex >= Index ->
+ {NthLogTerm, _} = StoreModule:lookup(Index, Data),
+ {FirstEntryTerm, _} = hd(MEntries),
if
NthLogTerm == FirstEntryTerm ->
Reply = #{
@@ -213,11 +211,21 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
{keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
NthLogTerm /= FirstEntryTerm ->
couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
- {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ case StoreModule:truncate(LastIndex - 1, Data) of
+ {ok, NewData} ->
+ {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+ {error, Reason} ->
+ {stop, Reason}
+ end
end;
- LastLogIndex == MPrevLogIndex ->
+ LastIndex == MPrevLogIndex ->
couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
- {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ case StoreModule:append(MEntries, Data) of
+ {ok, _EntryIndex, NewData} ->
+ {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+ {error, Reason} ->
+ {stop, Reason}
+ end
end
end
end;
@@ -245,10 +253,14 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
#{value := Value} = Msg,
- #{term := Term, log := Log, froms := Froms} = Data,
- EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
- Entry = {EntryIndex, Term, Value},
- {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+ #{term := Term, store_module := StoreModule, froms := Froms} = Data,
+ Entry = {Term, Value},
+ case StoreModule:append([Entry], Data) of
+ {ok, EntryIndex, NewData} ->
+ {keep_state, NewData#{froms => Froms#{EntryIndex => From}}};
+ {error, Reason} ->
+ {stop_and_reply, Reason, {reply, From, {error, Reason}}}
+ end;
handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
{keep_state_and_data, {reply, From, {error, not_leader}}};
@@ -256,7 +268,7 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
#{term := Term} = Data,
couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
- {next_state, candidate, start_election(Data), restart_election_timeout()};
+ persist({next_state, candidate, start_election(Data), restart_election_timeout()});
handle_event(state_timeout, heartbeat, leader, Data) ->
#{term := Term} = Data,
@@ -267,18 +279,22 @@ handle_event(state_timeout, heartbeat, leader, Data) ->
handle_event(EventType, EventContent, State, Data) ->
{stop, {unknown_event, EventType, EventContent, State, Data}}.
-
send_append_entries(#{cohort := Cohort} = Data) ->
send_append_entries(peers(Cohort), Data).
send_append_entries([], _Data) ->
ok;
send_append_entries([Peer | Rest], Data) ->
- #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+ #{term := Term, nextIndex := NextIndex, store_module := StoreModule, commitIndex := CommitIndex} = Data,
PrevLogIndex = maps:get(Peer, NextIndex) - 1,
- PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
- LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
- Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+ PrevLogTerm =
+ if
+ PrevLogIndex > 0 -> {NthTerm, _} = StoreModule:lookup(PrevLogIndex, Data), NthTerm;
+ true -> 0
+ end,
+ {LastIndex, _} = StoreModule:last(Data),
+ LastEntry = min(LastIndex, PrevLogIndex + 2),
+ Entries = StoreModule:range(PrevLogIndex + 1, ?BATCH_SIZE, Data),
Msg = #{
type => 'AppendEntriesRequest',
term => Term,
@@ -292,9 +308,9 @@ send_append_entries([Peer | Rest], Data) ->
send_append_entries(Rest, Data).
advance_commit_index(Data) ->
- #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
- LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
- LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+ #{matchIndex := MatchIndex, store_module := StoreModule, cohort := Cohort, term := Term} = Data,
+ {LastIndex, LastTerm} = StoreModule:last(Data),
+ LastIndexes = lists:sort([LastIndex | maps:values(MatchIndex)]),
NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
if
LastTerm == Term ->
@@ -305,33 +321,37 @@ advance_commit_index(Data) ->
update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
Data;
-update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
- #{log := Log, froms := Froms0, machine := Machine0} = Data,
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data0) when
+ LastApplied < CommitIndex
+->
+ #{store_module := StoreModule, froms := Froms0} = Data0,
From = LastApplied + 1,
- To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
- Fun = fun(Index, {Froms, Machine}) ->
- Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
- Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+ {LastIndex, _} = StoreModule:last(Data0),
+ To = min(LastIndex, CommitIndex),
+ Fun = fun(Index, {Froms, Data}) ->
+ {_, Value} = StoreModule:lookup(Index, Data),
+ {Result, NewData} = StoreModule:apply(Value, Data),
case maps:is_key(Index, Froms) of
true ->
gen_statem:reply(maps:get(Index, Froms), Result),
- {maps:remove(Index, Froms), Result};
+ {maps:remove(Index, Froms), NewData};
false ->
- {Froms, Result}
+ {Froms, NewData}
end
end,
- {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
- Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+ {Froms1, Data1} = lists:foldl(Fun, {Froms0, Data0}, lists:seq(From, To)),
+ Data1#{froms => Froms1, lastApplied => To}.
start_election(Data) ->
- #{term := Term, cohort := Cohort, log := Log} = Data,
+ #{term := Term, cohort := Cohort, store_module := StoreModule} = Data,
ElectionTerm = Term + 1,
couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+ {LastLogIndex, LastLogTerm} = StoreModule:last(Data),
RequestVote = #{
type => 'RequestVoteRequest',
term => ElectionTerm,
- lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
- lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+ lastLogIndex => LastLogIndex,
+ lastLogTerm => LastLogTerm,
source => node()
},
lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
@@ -348,3 +368,17 @@ restart_heartbeat_timeout() ->
peers(Cohort) ->
Cohort -- [node()].
+
+persist({next_state, _NextState, NewData, _Actions} = HandleEventResult) ->
+ persist(NewData, HandleEventResult);
+persist({keep_state, NewData, _Actions} = HandleEventResult) ->
+ persist(NewData, HandleEventResult).
+
+persist(Data, HandleEventResult) ->
+ #{store_module := StoreModule} = Data,
+ case StoreModule:save_state(Data) of
+ ok ->
+ HandleEventResult;
+ {error, Reason} ->
+ {stop, Reason}
+ end.
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
deleted file mode 100644
index 987212457..000000000
--- a/src/couch/src/couch_raft_log.erl
+++ /dev/null
@@ -1,52 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-
--module(couch_raft_log).
-
--export([
- new/0,
- append/2,
- sublist/3,
- nth/2,
- last/1,
- index/1,
- term/1,
- value/1
-]).
-
-new() ->
- [].
-
-append(Log, Items) ->
- lists:append(Log, Items).
-
-sublist(Log, Start, Len) ->
- lists:sublist(Log, Start, Len).
-
-nth(N, Log) ->
- lists:nth(N, Log).
-
-last([]) ->
- {0, 0, undefined};
-last(Log) ->
- lists:last(Log).
-
-index(Entry) ->
- element(1, Entry).
-
-term(Entry) ->
- element(2, Entry).
-
-value(Entry) ->
- element(3, Entry).
diff --git a/src/couch/src/couch_raft_store.erl b/src/couch/src/couch_raft_store.erl
new file mode 100644
index 000000000..81ebe684e
--- /dev/null
+++ b/src/couch/src/couch_raft_store.erl
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_store).
+
+-callback init(Args :: term()) -> {ok, State :: #{}} | {stop, Reason :: term()}.
+
+% raft state callbacks
+
+-callback save_state(State :: #{}) -> ok | {error, Reason :: term()}.
+
+%% log callbacks
+-type log_entry() :: {Term :: non_neg_integer(), Value :: term()}.
+-callback last(State :: #{}) -> {Index :: non_neg_integer(), Term :: non_neg_integer()}.
+-callback lookup(N :: non_neg_integer(), State :: #{}) -> log_entry() | not_found.
+-callback range(Start :: non_neg_integer(), Len :: non_neg_integer(), State :: #{}) -> [log_entry() | not_found].
+-callback append(Entries :: [log_entry()], State :: #{}) ->
+ {ok, Index :: non_neg_integer(), NewState :: #{}} | {error, Reason :: term()}.
+-callback truncate(To :: non_neg_integer(), State :: #{}) -> {ok, NewState :: #{}} | {error, Reason :: term()}.
+-callback discard(UpTo :: non_neg_integer(), State :: #{}) ->
+ {ok, NewState :: #{}} | {error, Reason :: term()}.
+
+%% state machine callbacks
+-callback apply(Args :: term(), State :: #{}) -> {Result :: term(), NewState :: #{}}.
diff --git a/src/couch/src/couch_raft_store_sha256.erl b/src/couch/src/couch_raft_store_sha256.erl
new file mode 100644
index 000000000..e313da3e2
--- /dev/null
+++ b/src/couch/src/couch_raft_store_sha256.erl
@@ -0,0 +1,80 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% a non-persistent implementation of the raft_log_store behaviour for testing purposes.
+
+-module(couch_raft_store_sha256).
+-behaviour(couch_raft_store).
+
+-export([
+ init/1,
+ save_state/1,
+ %% log
+ last/1,
+ lookup/2,
+ range/3,
+ append/2,
+ truncate/2,
+ discard/2,
+ %% state machine
+ apply/2
+]).
+
+init(Cohort) ->
+ {ok, #{
+ cohort => Cohort,
+ commitIndex => 0,
+ lastApplied => 0,
+ log => [],
+ machine => <<0>>,
+ term => 0,
+ votedFor => undefined
+ }}.
+
+% raft state callbacks
+
+save_state(#{} = State) ->
+ _WouldPersist = maps:with([cohort, term, votedFor, lastApplied, machine], State),
+ ok.
+
+%% log callbacks
+last(#{log := []}) ->
+ {0, 0};
+last(#{log := Log}) ->
+ {LastTerm, _} = lists:last(Log),
+ {length(Log), LastTerm}.
+
+lookup(0, #{}) ->
+ {0, 0};
+lookup(N, #{log := Log}) when N > 0 ->
+ lists:nth(N, Log).
+
+range(Start, Len, #{log := Log}) when Start > 0, Len > 0 ->
+ lists:sublist(Log, Start, Len).
+
+append(Entries, #{log := Log} = State) when is_list(Entries) ->
+ NewLog = lists:append(Log, Entries),
+ {ok, length(NewLog), State#{log => NewLog}}.
+
+truncate(To, #{log := Log} = State) ->
+ {ok, State#{log => lists:sublist(Log, To)}}.
+
+discard(_UpTo, #{}) ->
+ {error, not_implemented}.
+
+%% state machine callbacks
+
+apply(Bin, #{machine := Machine0} = State) when is_binary(Bin), is_binary(Machine0) ->
+ Machine1 = crypto:hash(sha256, <<Machine0/binary, Bin/binary>>),
+ {Machine1, State#{machine => Machine1}}.
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/couch_raft_SUITE.erl
similarity index 57%
rename from src/couch/test/eunit/couch_raft_SUITE.erl
rename to src/couch/test/couch_raft_SUITE.erl
index 1c3f8ebc2..42e1f4ab3 100644
--- a/src/couch/test/eunit/couch_raft_SUITE.erl
+++ b/src/couch/test/couch_raft_SUITE.erl
@@ -27,41 +27,55 @@ all() ->
three_nodes(Config) when is_list(Config) ->
N = 3,
- Args = ["-pa", filename:dirname(code:which(craft))],
+ Args = [
+ "-pa", filename:dirname(code:which(couch_raft)),
+ "-pa", filename:dirname(code:which(couch_log)),
+ "-pa", filename:dirname(code:which(couch_stats))
+ ],
Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
- Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+ Cohort = [
+ receive
+ {tag, {started, Node, Peer}} -> Node
+ end
+ || {ok, Peer} <- Peers
+ ],
- Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+ {ok, InitialState} = couch_raft_store_sha256:init(Cohort),
+ Crafts = [erpc:call(Node, couch_raft, start, [foo, couch_raft_store_sha256, InitialState]) || Node <- Cohort],
% wait for leader election
timer:sleep(500),
% verify only one leader elected
- [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
- [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+ [{leader, FirstLeader}] = lists:filter(
+ fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]
+ ),
% make a series of calls
Hash1 = crypto:hash(sha256, <<0, 1>>),
- ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+ ?assertEqual(Hash1, couch_raft:call(FirstLeader, <<1>>)),
Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
- ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+ ?assertEqual(Hash2, couch_raft:call(FirstLeader, <<2>>)),
Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
- ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+ ?assertEqual(Hash3, couch_raft:call(FirstLeader, <<3>>)),
% force a re-election
- craft3:stop(FirstLeader),
+ couch_raft:stop(FirstLeader),
timer:sleep(500),
% verify new leader elected
- [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
- [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+ [{leader, SecondLeader}] = lists:filter(
+ fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]
+ ),
?assertNotEqual(FirstLeader, SecondLeader),
% make another call
Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
- ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+ ?assertEqual(Hash4, couch_raft:call(SecondLeader, <<4>>)),
- [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+ [couch_raft:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
[peer:stop(Peer) || {ok, Peer} <- Peers].
[couchdb] 03/05: don't track matchIndex/nextIndex in non-leaders, pointless
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7e5fc643913ccd7616fc1029473193a343d182e2
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Jul 4 23:23:36 2022 +0100
don't track matchIndex/nextIndex in non-leaders, pointless
---
src/couch/src/couch_raft.erl | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index fda19cc22..c580346fc 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -49,14 +49,10 @@ start_link(Name, StoreModule, StoreState) ->
gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
new(Name, StoreModule, StoreState) ->
- #{cohort := Cohort} = StoreState,
- Peers = peers(Cohort),
maps:merge(#{
name => Name,
store_module => StoreModule,
votesGranted => #{},
- nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
- matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
froms => #{}
}, StoreState).
@@ -81,7 +77,8 @@ handle_event(enter, _OldState, follower, Data) ->
#{term := Term, froms := Froms} = Data,
couch_log:notice("~p became follower in term ~B", [node(), Term]),
Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
- persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
+ persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, froms => #{}}),
+ [restart_election_timeout() | Replies]});
handle_event(enter, _OldState, candidate, Data) ->
#{term := Term} = Data,
@@ -234,7 +231,7 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State,
couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
keep_state_and_data;
-handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, leader, #{term := Term} = Data) ->
#{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
#{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
@@ -251,6 +248,9 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
}}
end;
+handle_event(cast, #{type := 'AppendEntriesResponse'}, _State, _Data) ->
+ keep_state_and_data;
+
handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
#{value := Value} = Msg,
#{term := Term, store_module := StoreModule, froms := Froms} = Data,
[couchdb] 04/05: clear votesGranted when unused for readability
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch raft_storemodule
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 0b440b86f5d570ebdc7e7fa0677ae85481143044
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 17 21:53:01 2022 +0100
clear votesGranted when unused for readability
---
src/couch/src/couch_raft.erl | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index c580346fc..06025784e 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -52,7 +52,7 @@ new(Name, StoreModule, StoreState) ->
maps:merge(#{
name => Name,
store_module => StoreModule,
- votesGranted => #{},
+ votesGranted => undefined,
froms => #{}
}, StoreState).
@@ -71,13 +71,13 @@ callback_mode() ->
%% erlfmt-ignore
handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
- persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
+ persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined, votesGranted => undefined}, {next_event, cast, Msg}});
handle_event(enter, _OldState, follower, Data) ->
#{term := Term, froms := Froms} = Data,
couch_log:notice("~p became follower in term ~B", [node(), Term]),
Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
- persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, froms => #{}}),
+ persist({keep_state, maps:without([nextIndex, matchIndex], Data#{votedFor => undefined, votesGranted => undefined, froms => #{}}),
[restart_election_timeout() | Replies]});
handle_event(enter, _OldState, candidate, Data) ->