You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by Nick Vatamaniuc <va...@apache.org> on 2019/04/10 22:21:40 UTC

[DISCUSS] FDB and CouchDB replication

I was thinking how replication would work with FDB and so far there are two
main issues I believe would need to be addressed. One deals with how we
monitor _replicator db docs for changes, and other one is how replication
jobs coordinate so we don't run multiple replication jobs for the same
replication document in a cluster.

1) Shard-level vs fabric-level notifications for _replicator db docs

Currently replicator is monitoring and updating individual _replicator
shards. Change notifications are done via change feeds (normal,
non-continuous) and couch event server callbacks.
https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
With fdb we'd have to get these updates via a fabric changes feeds and rely
on the global _db_updates. That could result in a performance impact and
would be something to keep an eye on.

2) Replicator job coordination

Replicator has a basic constraint that there should be only one replication
job running for each replicator doc per cluster.

Each replication currently has a single "owner" node. The owner is picked
to be one of 3 nodes were the _replicator doc shards live. If nodes connect
or disconnect, replicator will reshuffle replication jobs and some nodes
will stop running jobs that they don't "own" anymore and then proceed to
"rescan" all the replicator docs to possibly start new ones. However, with
fdb, there are no connected erlang nodes and no shards. All coordination
happens via fdb, so we'd have to somehow coordinate replication job
ownership through there.

For discussion, here is a proposal for a worker registration layer do that
job coordination:

The basic idea is erlang fabric nodes would declare, by writing to fdb,
that they can take on certain "roles". "replicator" would be one such role.
And so, for each role there is a list of nodes. Each node picks a fraction
of jobs based on how many other nodes of the same role are in the list.
When membership changes, nodes which are alive might have to pick up new
jobs or stop running existing jobs since they'd be started by other nodes.

For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is
currently down so the membership list is [n2, n3]. If there are 60
replication jobs then n2 might run 30, and n3 another 30. n1 comes online
and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
then picks 20 replication jobs. At about the same time n2 and n3 notice n1
is online and decide to stop running the jobs that n1 would pick up and
they each would end up running roughly 20 jobs.

The difficulty here comes from maintaining liveliness. A node could stop at
any time without removing itself from the membership list of its roles.
That means all of the sudden a subset of jobs would stop running without
anyone picking them up. So, the idea is to have nodes periodically update
their health status in fdb to indicate they are alive. Once a node doesn't
update its status often enough it will be considered dead and others can
pick up its share of jobs.

To start the discussion, I sketched this data layout and pseudocode:

Data layout:

("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
...])
("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
Timestamp)

Role : In our case it would be "replicator", but it could be any other role.

WId : are workers IDs. These should unique identify workers. It would be
nice if it could be persisted, such that a worker doing a quick restart
will end up with the same id and the membership list won't change. However,
a random UUID would work as well.

Timeout : This is the timeout declared by the nodes themselves. These need
not be the same for all node. Some nodes might decide they run slower so
their timeouts would be larger. But they essentially promise to update
their health status at least that often.

Timestamp: The time of the last health report from that node. Timestamps
technically might not be needed as neighbor monitors could remember the
time delta between when it saw changes to the health values' version stamp.

Pseudocode:

init(Role) ->
    Members = tx(add_to_members(self(), Role)
    spawn health_ping(Members, Role)
    spawn neighbor_check(Members, Role)
    loop()

terminate() ->
    tx(remove_self_from_members_and_health_list())

loop() ->
    {Members, Watch} = tx(add_members(self(), Role), get_watch())
    receive
    {Watch, NewMembers} ->
        case diff(Members, NewMembers) of
        no_diff ->
            ok;
        {Added, Removed} ->
            update_neighbor_check(NewMembers)
            fire_callbacks(Added, Removed)
    end,
    loop()

health_ping(Members, Role) ->
   tx(update_health(Role, self(), Timestamp))
   sleep(Timeout / 3)
   health_ping(Members, Role)

neighbor_check(Members, Role) ->
  Neighbor = next_in_list(self(), Members)
  {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
  case now() - Timestamp > Timeout of
  true ->
      NewMembers = Tx(remove_neighbor(Neighbor, Role))
      neighbor_check(NewMembers, Role)
  false ->
      sleep(Timeout)
      neighbor_check(Members, Role)
  end


Description:

Nodes add themselves to a membership list for each role they participate
in. The membership list has a version stamp. It's there to ensure that the
watch that is created during the update would find any change occurring
since their update.

tx(...) is pseudocode for "runs in a transaction"

neighbor_check() is how entries for dead workers are cleaned up. Each node
will monitor its neighbor's status. If it sees the neighbor has stopped
responding it will remove it from the list. That will update the membership
list and will fire the watch. Everyone will notice and rescan their
replicator docs.

fire_callbacks() is just reporting to the replicator app that membership
has changed it and might need to rescan. On top of this code currently
there is a cluster stability logic that waits a bit before rescanning in
case there is a flurry of node membership changes. Like say on rolling node
reboots or cluster startup.

I am not entirely sure on the semantics of watches and how lightweight or
heavyweight they are. Creating a watch and a version stamp will hopefully
not lose updates. That is, all updates after that transaction's watch will
fire the watch. Watches seem to have limits and then I think we'd need to
revert to polling
https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
which make sense but wondering if we should just start with polling first
and larger poll intervals. I guess it depends on how many other places we'd
have use watches and if we'd ever come close the even needing to handle
that error case.


What does everyone think? The idea would to be turn the proposal from 2)
into an RFC but wanted to open it for a general discussion and see what
everythone thought about it.

Re: [DISCUSS] FDB and CouchDB replication

Posted by Nick Vatamaniuc <va...@gmail.com>.
Hi Adam,

Great feedback. I'll respond in line.

On Wed, Apr 10, 2019 at 7:43 PM Adam Kocoloski <ko...@apache.org> wrote:

> Hi Nick,
>
> Good stuff. On the first topic, I wonder if it makes sense to use a
> dedicated code path for updates to _replicator DB docs, one that would
> automatically register these replication jobs in a queue in FDB as part of
> the update transaction. That’d save the overhead and latency of listening
> to _db_updates and then each _replicator DB’s _changes feed just to
> discover these updates (and then presumably create jobs in a job queue
> anyway).
>
>
I like the idea of having a notification service for _replicator docs, it
could apply for _users as well perhaps. It would replace the
https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl
facility
and we'd have a list of suffixes (_replicator, _users) for which we'd call
handlers when those databases are updated. It would be like a "systems dbs"
and docs index in a way.

On the second topic — is it important for each node declaring the
> replicator role to receive an equal allotment of jobs and manage its own
> queue, or can the replication worker processes on each node just grab the
> next job off the global queue in FDB whenever they free up? I could see the
> latter approach decreasing the tail latency for job execution, and I think
> there are good patterns for managing high contention dequeue operations in
> the case where we’ve got more worker processes than jobs to run.


In theory it's not important, however in practice what we effectively have
is a sharded queue (a queue per node), and we might have been reaping
performance benefits of from its decentralization. Centralizing it to one
global queue we might hit limitations we didn't see before. But you're
correct that the downside is inefficient utilization of resource if say 10
"heavy" replication jobs land on node1 because their IDs happen to hash
there, and node2 is just sitting idle. Though, I imagine over thousands of
jobs that might get smoothed over.

One way we could proceed is to start with a global queue, like you and
other contributors have suggested, and then we can add
partitioning/sharding later if needed. But it would be per-role so to
speak. So Role = "replicator" might become Role = {1, "replicator"} and
Role = {2, "replicator"} if we decide contention is too hard to deal with
at scale.



> Regardless, you make a good point about paying special attention to
> liveness checking now that we’re not relying on Erlang distribution for
> that purpose. I didn’t grok all the details of approach you have in mind
> for that yet because I wanted to bring up these two points above and get
> your perspective.


> Adam
>
> > On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org>
> wrote:
> >
> > I was thinking how replication would work with FDB and so far there are
> two
> > main issues I believe would need to be addressed. One deals with how we
> > monitor _replicator db docs for changes, and other one is how replication
> > jobs coordinate so we don't run multiple replication jobs for the same
> > replication document in a cluster.
> >
> > 1) Shard-level vs fabric-level notifications for _replicator db docs
> >
> > Currently replicator is monitoring and updating individual _replicator
> > shards. Change notifications are done via change feeds (normal,
> > non-continuous) and couch event server callbacks.
> >
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180
> ,
> >
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246
> .
> > With fdb we'd have to get these updates via a fabric changes feeds and
> rely
> > on the global _db_updates. That could result in a performance impact and
> > would be something to keep an eye on.
> >
> > 2) Replicator job coordination
> >
> > Replicator has a basic constraint that there should be only one
> replication
> > job running for each replicator doc per cluster.
> >
> > Each replication currently has a single "owner" node. The owner is picked
> > to be one of 3 nodes were the _replicator doc shards live. If nodes
> connect
> > or disconnect, replicator will reshuffle replication jobs and some nodes
> > will stop running jobs that they don't "own" anymore and then proceed to
> > "rescan" all the replicator docs to possibly start new ones. However,
> with
> > fdb, there are no connected erlang nodes and no shards. All coordination
> > happens via fdb, so we'd have to somehow coordinate replication job
> > ownership through there.
> >
> > For discussion, here is a proposal for a worker registration layer do
> that
> > job coordination:
> >
> > The basic idea is erlang fabric nodes would declare, by writing to fdb,
> > that they can take on certain "roles". "replicator" would be one such
> role.
> > And so, for each role there is a list of nodes. Each node picks a
> fraction
> > of jobs based on how many other nodes of the same role are in the list.
> > When membership changes, nodes which are alive might have to pick up new
> > jobs or stop running existing jobs since they'd be started by other
> nodes.
> >
> > For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1
> is
> > currently down so the membership list is [n2, n3]. If there are 60
> > replication jobs then n2 might run 30, and n3 another 30. n1 comes online
> > and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
> > then picks 20 replication jobs. At about the same time n2 and n3 notice
> n1
> > is online and decide to stop running the jobs that n1 would pick up and
> > they each would end up running roughly 20 jobs.
> >
> > The difficulty here comes from maintaining liveliness. A node could stop
> at
> > any time without removing itself from the membership list of its roles.
> > That means all of the sudden a subset of jobs would stop running without
> > anyone picking them up. So, the idea is to have nodes periodically update
> > their health status in fdb to indicate they are alive. Once a node
> doesn't
> > update its status often enough it will be considered dead and others can
> > pick up its share of jobs.
> >
> > To start the discussion, I sketched this data layout and pseudocode:
> >
> > Data layout:
> >
> > ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> > ...])
> > ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> > Timestamp)
> >
> > Role : In our case it would be "replicator", but it could be any other
> role.
> >
> > WId : are workers IDs. These should unique identify workers. It would be
> > nice if it could be persisted, such that a worker doing a quick restart
> > will end up with the same id and the membership list won't change.
> However,
> > a random UUID would work as well.
> >
> > Timeout : This is the timeout declared by the nodes themselves. These
> need
> > not be the same for all node. Some nodes might decide they run slower so
> > their timeouts would be larger. But they essentially promise to update
> > their health status at least that often.
> >
> > Timestamp: The time of the last health report from that node. Timestamps
> > technically might not be needed as neighbor monitors could remember the
> > time delta between when it saw changes to the health values' version
> stamp.
> >
> > Pseudocode:
> >
> > init(Role) ->
> >    Members = tx(add_to_members(self(), Role)
> >    spawn health_ping(Members, Role)
> >    spawn neighbor_check(Members, Role)
> >    loop()
> >
> > terminate() ->
> >    tx(remove_self_from_members_and_health_list())
> >
> > loop() ->
> >    {Members, Watch} = tx(add_members(self(), Role), get_watch())
> >    receive
> >    {Watch, NewMembers} ->
> >        case diff(Members, NewMembers) of
> >        no_diff ->
> >            ok;
> >        {Added, Removed} ->
> >            update_neighbor_check(NewMembers)
> >            fire_callbacks(Added, Removed)
> >    end,
> >    loop()
> >
> > health_ping(Members, Role) ->
> >   tx(update_health(Role, self(), Timestamp))
> >   sleep(Timeout / 3)
> >   health_ping(Members, Role)
> >
> > neighbor_check(Members, Role) ->
> >  Neighbor = next_in_list(self(), Members)
> >  {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
> >  case now() - Timestamp > Timeout of
> >  true ->
> >      NewMembers = Tx(remove_neighbor(Neighbor, Role))
> >      neighbor_check(NewMembers, Role)
> >  false ->
> >      sleep(Timeout)
> >      neighbor_check(Members, Role)
> >  end
> >
> >
> > Description:
> >
> > Nodes add themselves to a membership list for each role they participate
> > in. The membership list has a version stamp. It's there to ensure that
> the
> > watch that is created during the update would find any change occurring
> > since their update.
> >
> > tx(...) is pseudocode for "runs in a transaction"
> >
> > neighbor_check() is how entries for dead workers are cleaned up. Each
> node
> > will monitor its neighbor's status. If it sees the neighbor has stopped
> > responding it will remove it from the list. That will update the
> membership
> > list and will fire the watch. Everyone will notice and rescan their
> > replicator docs.
> >
> > fire_callbacks() is just reporting to the replicator app that membership
> > has changed it and might need to rescan. On top of this code currently
> > there is a cluster stability logic that waits a bit before rescanning in
> > case there is a flurry of node membership changes. Like say on rolling
> node
> > reboots or cluster startup.
> >
> > I am not entirely sure on the semantics of watches and how lightweight or
> > heavyweight they are. Creating a watch and a version stamp will hopefully
> > not lose updates. That is, all updates after that transaction's watch
> will
> > fire the watch. Watches seem to have limits and then I think we'd need to
> > revert to polling
> >
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> > which make sense but wondering if we should just start with polling first
> > and larger poll intervals. I guess it depends on how many other places
> we'd
> > have use watches and if we'd ever come close the even needing to handle
> > that error case.
> >
> >
> > What does everyone think? The idea would to be turn the proposal from 2)
> > into an RFC but wanted to open it for a general discussion and see what
> > everythone thought about it.
>
>

Re: [DISCUSS] FDB and CouchDB replication

Posted by Nick Vatamaniuc <va...@gmail.com>.
Hi Jan,

On Thu, Apr 11, 2019 at 4:16 AM Jan Lehnardt <ja...@apache.org> wrote:

> An advanced implementation could maintain lists of [{node: n1, ratio:
> 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins
> can signify the ratio of jobs any given node should be handling. Of course
> this gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure
> there are clever ways out of that. It might also be too complicated to get
> started with this, but it might be worth using a structured list, to allow
> extensions later.
>
>
I thought of adding an info or capacity to each node / executor and it
might be a good idea. Although also leaning to going with a single queue as
other contributor have commented. But with the current scheme, we have 500
max_jobs that will run at the same time. Any node can absorb a higher
number of course, and then they'd be scheduled on that node as needed. With
the assumption that all nodes are about the same capacity, it ends up
even-ing out. Say out of 10k jobs, randomly distributed amongst 10 nodes
chances are each node would get about 1000 jobs and would have about the
same load. Now if we only had 10 jobs, then yeah one node could end up with
3 heavy replications and most others will be idle.

* * *
>
> Nick, I like your proposal. I was briefly worrying about Adam’s suggestion
> to add a fast-path for the replicator, because I use a similar owner model
> for the db-per-user server, and thought I wanted to advocate for a more
> general framework that can be used for various types of jobs (something
> that the Roles variable in the model already allows), but I now agree that
> the importance of making replication as efficient as possible is a
> worthwhile optimisation vs. the other jobs we already have, or could
> imagine running in the future.
>

Good point. Yeah I remember couch-per-user was using a few similar pattern.
We are reading _changes from _users shard on each node and then maintaining
changes feed to all them. Then using the same "ownership" function
consistently pick one of the nodes for each db update to handle db
creation. Maybe the _replicator and _users both should participate in this
optimized changes notification. And then would it mean having an
"db_per_user" role and separate queue with node executing jobs? It might
makes sense, but we should think about it more in depth.


> Best
> Jan
> —
>
> > On 11. Apr 2019, at 01:51, Robert Newson <rn...@apache.org> wrote:
> >
> > As long as any given replicator node can grab as much work as it can
> handle, It doesn't need to be 'fair' in the way we currently do it. The
> notion of an 'owner' node drops away imo. As nick mentions, the fun part is
> recognising when jobs become unowned due to a resource failure somewhere
> but this is a very standard thing, a pool of workers competing over jobs.
> >
> > --
> >  Robert Samuel Newson
> >  rnewson@apache.org
> >
> > On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
> >> Hi Nick,
> >>
> >> Good stuff. On the first topic, I wonder if it makes sense to use a
> >> dedicated code path for updates to _replicator DB docs, one that would
> >> automatically register these replication jobs in a queue in FDB as part
> >> of the update transaction. That’d save the overhead and latency of
> >> listening to _db_updates and then each _replicator DB’s _changes feed
> >> just to discover these updates (and then presumably create jobs in a
> >> job queue anyway).
> >>
> >> On the second topic — is it important for each node declaring the
> >> replicator role to receive an equal allotment of jobs and manage its
> >> own queue, or can the replication worker processes on each node just
> >> grab the next job off the global queue in FDB whenever they free up? I
> >> could see the latter approach decreasing the tail latency for job
> >> execution, and I think there are good patterns for managing high
> >> contention dequeue operations in the case where we’ve got more worker
> >> processes than jobs to run.
> >>
> >> Regardless, you make a good point about paying special attention to
> >> liveness checking now that we’re not relying on Erlang distribution for
> >> that purpose. I didn’t grok all the details of approach you have in
> >> mind for that yet because I wanted to bring up these two points above
> >> and get your perspective.
> >>
> >> Adam
> >>
> >>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org>
> wrote:
> >>>
> >>> I was thinking how replication would work with FDB and so far there
> are two
> >>> main issues I believe would need to be addressed. One deals with how we
> >>> monitor _replicator db docs for changes, and other one is how
> replication
> >>> jobs coordinate so we don't run multiple replication jobs for the same
> >>> replication document in a cluster.
> >>>
> >>> 1) Shard-level vs fabric-level notifications for _replicator db docs
> >>>
> >>> Currently replicator is monitoring and updating individual _replicator
> >>> shards. Change notifications are done via change feeds (normal,
> >>> non-continuous) and couch event server callbacks.
> >>>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180
> ,
> >>>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246
> .
> >>> With fdb we'd have to get these updates via a fabric changes feeds and
> rely
> >>> on the global _db_updates. That could result in a performance impact
> and
> >>> would be something to keep an eye on.
> >>>
> >>> 2) Replicator job coordination
> >>>
> >>> Replicator has a basic constraint that there should be only one
> replication
> >>> job running for each replicator doc per cluster.
> >>>
> >>> Each replication currently has a single "owner" node. The owner is
> picked
> >>> to be one of 3 nodes were the _replicator doc shards live. If nodes
> connect
> >>> or disconnect, replicator will reshuffle replication jobs and some
> nodes
> >>> will stop running jobs that they don't "own" anymore and then proceed
> to
> >>> "rescan" all the replicator docs to possibly start new ones. However,
> with
> >>> fdb, there are no connected erlang nodes and no shards. All
> coordination
> >>> happens via fdb, so we'd have to somehow coordinate replication job
> >>> ownership through there.
> >>>
> >>> For discussion, here is a proposal for a worker registration layer do
> that
> >>> job coordination:
> >>>
> >>> The basic idea is erlang fabric nodes would declare, by writing to fdb,
> >>> that they can take on certain "roles". "replicator" would be one such
> role.
> >>> And so, for each role there is a list of nodes. Each node picks a
> fraction
> >>> of jobs based on how many other nodes of the same role are in the list.
> >>> When membership changes, nodes which are alive might have to pick up
> new
> >>> jobs or stop running existing jobs since they'd be started by other
> nodes.
> >>>
> >>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3].
> n1 is
> >>> currently down so the membership list is [n2, n3]. If there are 60
> >>> replication jobs then n2 might run 30, and n3 another 30. n1 comes
> online
> >>> and adds itself to the roles list, which now looks like [n1, n2, n3].
> n1
> >>> then picks 20 replication jobs. At about the same time n2 and n3
> notice n1
> >>> is online and decide to stop running the jobs that n1 would pick up and
> >>> they each would end up running roughly 20 jobs.
> >>>
> >>> The difficulty here comes from maintaining liveliness. A node could
> stop at
> >>> any time without removing itself from the membership list of its roles.
> >>> That means all of the sudden a subset of jobs would stop running
> without
> >>> anyone picking them up. So, the idea is to have nodes periodically
> update
> >>> their health status in fdb to indicate they are alive. Once a node
> doesn't
> >>> update its status often enough it will be considered dead and others
> can
> >>> pick up its share of jobs.
> >>>
> >>> To start the discussion, I sketched this data layout and pseudocode:
> >>>
> >>> Data layout:
> >>>
> >>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> >>> ...])
> >>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> >>> Timestamp)
> >>>
> >>> Role : In our case it would be "replicator", but it could be any other
> role.
> >>>
> >>> WId : are workers IDs. These should unique identify workers. It would
> be
> >>> nice if it could be persisted, such that a worker doing a quick restart
> >>> will end up with the same id and the membership list won't change.
> However,
> >>> a random UUID would work as well.
> >>>
> >>> Timeout : This is the timeout declared by the nodes themselves. These
> need
> >>> not be the same for all node. Some nodes might decide they run slower
> so
> >>> their timeouts would be larger. But they essentially promise to update
> >>> their health status at least that often.
> >>>
> >>> Timestamp: The time of the last health report from that node.
> Timestamps
> >>> technically might not be needed as neighbor monitors could remember the
> >>> time delta between when it saw changes to the health values' version
> stamp.
> >>>
> >>> Pseudocode:
> >>>
> >>> init(Role) ->
> >>>   Members = tx(add_to_members(self(), Role)
> >>>   spawn health_ping(Members, Role)
> >>>   spawn neighbor_check(Members, Role)
> >>>   loop()
> >>>
> >>> terminate() ->
> >>>   tx(remove_self_from_members_and_health_list())
> >>>
> >>> loop() ->
> >>>   {Members, Watch} = tx(add_members(self(), Role), get_watch())
> >>>   receive
> >>>   {Watch, NewMembers} ->
> >>>       case diff(Members, NewMembers) of
> >>>       no_diff ->
> >>>           ok;
> >>>       {Added, Removed} ->
> >>>           update_neighbor_check(NewMembers)
> >>>           fire_callbacks(Added, Removed)
> >>>   end,
> >>>   loop()
> >>>
> >>> health_ping(Members, Role) ->
> >>>  tx(update_health(Role, self(), Timestamp))
> >>>  sleep(Timeout / 3)
> >>>  health_ping(Members, Role)
> >>>
> >>> neighbor_check(Members, Role) ->
> >>> Neighbor = next_in_list(self(), Members)
> >>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
> >>> case now() - Timestamp > Timeout of
> >>> true ->
> >>>     NewMembers = Tx(remove_neighbor(Neighbor, Role))
> >>>     neighbor_check(NewMembers, Role)
> >>> false ->
> >>>     sleep(Timeout)
> >>>     neighbor_check(Members, Role)
> >>> end
> >>>
> >>>
> >>> Description:
> >>>
> >>> Nodes add themselves to a membership list for each role they
> participate
> >>> in. The membership list has a version stamp. It's there to ensure that
> the
> >>> watch that is created during the update would find any change occurring
> >>> since their update.
> >>>
> >>> tx(...) is pseudocode for "runs in a transaction"
> >>>
> >>> neighbor_check() is how entries for dead workers are cleaned up. Each
> node
> >>> will monitor its neighbor's status. If it sees the neighbor has stopped
> >>> responding it will remove it from the list. That will update the
> membership
> >>> list and will fire the watch. Everyone will notice and rescan their
> >>> replicator docs.
> >>>
> >>> fire_callbacks() is just reporting to the replicator app that
> membership
> >>> has changed it and might need to rescan. On top of this code currently
> >>> there is a cluster stability logic that waits a bit before rescanning
> in
> >>> case there is a flurry of node membership changes. Like say on rolling
> node
> >>> reboots or cluster startup.
> >>>
> >>> I am not entirely sure on the semantics of watches and how lightweight
> or
> >>> heavyweight they are. Creating a watch and a version stamp will
> hopefully
> >>> not lose updates. That is, all updates after that transaction's watch
> will
> >>> fire the watch. Watches seem to have limits and then I think we'd need
> to
> >>> revert to polling
> >>>
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> >>> which make sense but wondering if we should just start with polling
> first
> >>> and larger poll intervals. I guess it depends on how many other places
> we'd
> >>> have use watches and if we'd ever come close the even needing to handle
> >>> that error case.
> >>>
> >>>
> >>> What does everyone think? The idea would to be turn the proposal from
> 2)
> >>> into an RFC but wanted to open it for a general discussion and see what
> >>> everythone thought about it.
> >>
> >>
>
> --
> Professional Support for Apache CouchDB:
> https://neighbourhood.ie/couchdb-support/
>
>

Re: [DISCUSS] FDB and CouchDB replication

Posted by Nick Vatamaniuc <va...@gmail.com>.
On Thu, Apr 11, 2019 at 4:46 AM Garren Smith <ga...@apache.org> wrote:

> I quite like the idea that Adam brought up of having a global queue that
> allows workers to pull replication jobs from. That means that on a cluster
> with a lot of replications we could spin up a lot of workers for a short
> period of time to complete all replications before decreasing the workers
> again.
>

We'd have to see what tricks we'd have available to handle contention at
both insertion and removal. As I mentioned in reply to Adam's message, what
we have now is effectively a sharded queue and while it is not efficient at
rebalancing jobs based on runtime capacity, it has minimal global
coordination and that's a nice property in a distributed system. I remember
seeing struggling clusters with thousands of _replicator db shards all
trying to process the their documents and create replication jobs on node
startup. So that's something we'd need to guard against and make sure the
global queue is robust enough to handle that.

Otherwise the idea is certainly cleaner and easier to reason about and I
think we should try it!


> On Thu, Apr 11, 2019 at 10:16 AM Jan Lehnardt <ja...@apache.org> wrote:
>
> > An advanced implementation could maintain lists of [{node: n1, ratio:
> > 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or
> admins
> > can signify the ratio of jobs any given node should be handling. Of
> course
> > this gets complicated quickly if the sum of ratio is <1 or >1, but I’m
> sure
> > there are clever ways out of that. It might also be too complicated to
> get
> > started with this, but it might be worth using a structured list, to
> allow
> > extensions later.
> >
> > * * *
> >
> > Nick, I like your proposal. I was briefly worrying about Adam’s
> suggestion
> > to add a fast-path for the replicator, because I use a similar owner
> model
> > for the db-per-user server, and thought I wanted to advocate for a more
> > general framework that can be used for various types of jobs (something
> > that the Roles variable in the model already allows), but I now agree
> that
> > the importance of making replication as efficient as possible is a
> > worthwhile optimisation vs. the other jobs we already have, or could
> > imagine running in the future.
> >
> > Best
> > Jan
> > —
> >
> > > On 11. Apr 2019, at 01:51, Robert Newson <rn...@apache.org> wrote:
> > >
> > > As long as any given replicator node can grab as much work as it can
> > handle, It doesn't need to be 'fair' in the way we currently do it. The
> > notion of an 'owner' node drops away imo. As nick mentions, the fun part
> is
> > recognising when jobs become unowned due to a resource failure somewhere
> > but this is a very standard thing, a pool of workers competing over jobs.
> > >
> > > --
> > >  Robert Samuel Newson
> > >  rnewson@apache.org
> > >
> > > On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
> > >> Hi Nick,
> > >>
> > >> Good stuff. On the first topic, I wonder if it makes sense to use a
> > >> dedicated code path for updates to _replicator DB docs, one that would
> > >> automatically register these replication jobs in a queue in FDB as
> part
> > >> of the update transaction. That’d save the overhead and latency of
> > >> listening to _db_updates and then each _replicator DB’s _changes feed
> > >> just to discover these updates (and then presumably create jobs in a
> > >> job queue anyway).
> > >>
> > >> On the second topic — is it important for each node declaring the
> > >> replicator role to receive an equal allotment of jobs and manage its
> > >> own queue, or can the replication worker processes on each node just
> > >> grab the next job off the global queue in FDB whenever they free up? I
> > >> could see the latter approach decreasing the tail latency for job
> > >> execution, and I think there are good patterns for managing high
> > >> contention dequeue operations in the case where we’ve got more worker
> > >> processes than jobs to run.
> > >>
> > >> Regardless, you make a good point about paying special attention to
> > >> liveness checking now that we’re not relying on Erlang distribution
> for
> > >> that purpose. I didn’t grok all the details of approach you have in
> > >> mind for that yet because I wanted to bring up these two points above
> > >> and get your perspective.
> > >>
> > >> Adam
> > >>
> > >>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org>
> > wrote:
> > >>>
> > >>> I was thinking how replication would work with FDB and so far there
> > are two
> > >>> main issues I believe would need to be addressed. One deals with how
> we
> > >>> monitor _replicator db docs for changes, and other one is how
> > replication
> > >>> jobs coordinate so we don't run multiple replication jobs for the
> same
> > >>> replication document in a cluster.
> > >>>
> > >>> 1) Shard-level vs fabric-level notifications for _replicator db docs
> > >>>
> > >>> Currently replicator is monitoring and updating individual
> _replicator
> > >>> shards. Change notifications are done via change feeds (normal,
> > >>> non-continuous) and couch event server callbacks.
> > >>>
> >
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180
> > ,
> > >>>
> >
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246
> > .
> > >>> With fdb we'd have to get these updates via a fabric changes feeds
> and
> > rely
> > >>> on the global _db_updates. That could result in a performance impact
> > and
> > >>> would be something to keep an eye on.
> > >>>
> > >>> 2) Replicator job coordination
> > >>>
> > >>> Replicator has a basic constraint that there should be only one
> > replication
> > >>> job running for each replicator doc per cluster.
> > >>>
> > >>> Each replication currently has a single "owner" node. The owner is
> > picked
> > >>> to be one of 3 nodes were the _replicator doc shards live. If nodes
> > connect
> > >>> or disconnect, replicator will reshuffle replication jobs and some
> > nodes
> > >>> will stop running jobs that they don't "own" anymore and then proceed
> > to
> > >>> "rescan" all the replicator docs to possibly start new ones. However,
> > with
> > >>> fdb, there are no connected erlang nodes and no shards. All
> > coordination
> > >>> happens via fdb, so we'd have to somehow coordinate replication job
> > >>> ownership through there.
> > >>>
> > >>> For discussion, here is a proposal for a worker registration layer do
> > that
> > >>> job coordination:
> > >>>
> > >>> The basic idea is erlang fabric nodes would declare, by writing to
> fdb,
> > >>> that they can take on certain "roles". "replicator" would be one such
> > role.
> > >>> And so, for each role there is a list of nodes. Each node picks a
> > fraction
> > >>> of jobs based on how many other nodes of the same role are in the
> list.
> > >>> When membership changes, nodes which are alive might have to pick up
> > new
> > >>> jobs or stop running existing jobs since they'd be started by other
> > nodes.
> > >>>
> > >>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3].
> > n1 is
> > >>> currently down so the membership list is [n2, n3]. If there are 60
> > >>> replication jobs then n2 might run 30, and n3 another 30. n1 comes
> > online
> > >>> and adds itself to the roles list, which now looks like [n1, n2, n3].
> > n1
> > >>> then picks 20 replication jobs. At about the same time n2 and n3
> > notice n1
> > >>> is online and decide to stop running the jobs that n1 would pick up
> and
> > >>> they each would end up running roughly 20 jobs.
> > >>>
> > >>> The difficulty here comes from maintaining liveliness. A node could
> > stop at
> > >>> any time without removing itself from the membership list of its
> roles.
> > >>> That means all of the sudden a subset of jobs would stop running
> > without
> > >>> anyone picking them up. So, the idea is to have nodes periodically
> > update
> > >>> their health status in fdb to indicate they are alive. Once a node
> > doesn't
> > >>> update its status often enough it will be considered dead and others
> > can
> > >>> pick up its share of jobs.
> > >>>
> > >>> To start the discussion, I sketched this data layout and pseudocode:
> > >>>
> > >>> Data layout:
> > >>>
> > >>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1,
> WId2,
> > >>> ...])
> > >>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp,
> Timeout,
> > >>> Timestamp)
> > >>>
> > >>> Role : In our case it would be "replicator", but it could be any
> other
> > role.
> > >>>
> > >>> WId : are workers IDs. These should unique identify workers. It would
> > be
> > >>> nice if it could be persisted, such that a worker doing a quick
> restart
> > >>> will end up with the same id and the membership list won't change.
> > However,
> > >>> a random UUID would work as well.
> > >>>
> > >>> Timeout : This is the timeout declared by the nodes themselves. These
> > need
> > >>> not be the same for all node. Some nodes might decide they run slower
> > so
> > >>> their timeouts would be larger. But they essentially promise to
> update
> > >>> their health status at least that often.
> > >>>
> > >>> Timestamp: The time of the last health report from that node.
> > Timestamps
> > >>> technically might not be needed as neighbor monitors could remember
> the
> > >>> time delta between when it saw changes to the health values' version
> > stamp.
> > >>>
> > >>> Pseudocode:
> > >>>
> > >>> init(Role) ->
> > >>>   Members = tx(add_to_members(self(), Role)
> > >>>   spawn health_ping(Members, Role)
> > >>>   spawn neighbor_check(Members, Role)
> > >>>   loop()
> > >>>
> > >>> terminate() ->
> > >>>   tx(remove_self_from_members_and_health_list())
> > >>>
> > >>> loop() ->
> > >>>   {Members, Watch} = tx(add_members(self(), Role), get_watch())
> > >>>   receive
> > >>>   {Watch, NewMembers} ->
> > >>>       case diff(Members, NewMembers) of
> > >>>       no_diff ->
> > >>>           ok;
> > >>>       {Added, Removed} ->
> > >>>           update_neighbor_check(NewMembers)
> > >>>           fire_callbacks(Added, Removed)
> > >>>   end,
> > >>>   loop()
> > >>>
> > >>> health_ping(Members, Role) ->
> > >>>  tx(update_health(Role, self(), Timestamp))
> > >>>  sleep(Timeout / 3)
> > >>>  health_ping(Members, Role)
> > >>>
> > >>> neighbor_check(Members, Role) ->
> > >>> Neighbor = next_in_list(self(), Members)
> > >>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
> > >>> case now() - Timestamp > Timeout of
> > >>> true ->
> > >>>     NewMembers = Tx(remove_neighbor(Neighbor, Role))
> > >>>     neighbor_check(NewMembers, Role)
> > >>> false ->
> > >>>     sleep(Timeout)
> > >>>     neighbor_check(Members, Role)
> > >>> end
> > >>>
> > >>>
> > >>> Description:
> > >>>
> > >>> Nodes add themselves to a membership list for each role they
> > participate
> > >>> in. The membership list has a version stamp. It's there to ensure
> that
> > the
> > >>> watch that is created during the update would find any change
> occurring
> > >>> since their update.
> > >>>
> > >>> tx(...) is pseudocode for "runs in a transaction"
> > >>>
> > >>> neighbor_check() is how entries for dead workers are cleaned up. Each
> > node
> > >>> will monitor its neighbor's status. If it sees the neighbor has
> stopped
> > >>> responding it will remove it from the list. That will update the
> > membership
> > >>> list and will fire the watch. Everyone will notice and rescan their
> > >>> replicator docs.
> > >>>
> > >>> fire_callbacks() is just reporting to the replicator app that
> > membership
> > >>> has changed it and might need to rescan. On top of this code
> currently
> > >>> there is a cluster stability logic that waits a bit before rescanning
> > in
> > >>> case there is a flurry of node membership changes. Like say on
> rolling
> > node
> > >>> reboots or cluster startup.
> > >>>
> > >>> I am not entirely sure on the semantics of watches and how
> lightweight
> > or
> > >>> heavyweight they are. Creating a watch and a version stamp will
> > hopefully
> > >>> not lose updates. That is, all updates after that transaction's watch
> > will
> > >>> fire the watch. Watches seem to have limits and then I think we'd
> need
> > to
> > >>> revert to polling
> > >>>
> >
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> > >>> which make sense but wondering if we should just start with polling
> > first
> > >>> and larger poll intervals. I guess it depends on how many other
> places
> > we'd
> > >>> have use watches and if we'd ever come close the even needing to
> handle
> > >>> that error case.
> > >>>
> > >>>
> > >>> What does everyone think? The idea would to be turn the proposal from
> > 2)
> > >>> into an RFC but wanted to open it for a general discussion and see
> what
> > >>> everythone thought about it.
> > >>
> > >>
> >
> > --
> > Professional Support for Apache CouchDB:
> > https://neighbourhood.ie/couchdb-support/
> >
> >
>

Re: [DISCUSS] FDB and CouchDB replication

Posted by Garren Smith <ga...@apache.org>.
I quite like the idea that Adam brought up of having a global queue that
allows workers to pull replication jobs from. That means that on a cluster
with a lot of replications we could spin up a lot of workers for a short
period of time to complete all replications before decreasing the workers
again.

On Thu, Apr 11, 2019 at 10:16 AM Jan Lehnardt <ja...@apache.org> wrote:

> An advanced implementation could maintain lists of [{node: n1, ratio:
> 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins
> can signify the ratio of jobs any given node should be handling. Of course
> this gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure
> there are clever ways out of that. It might also be too complicated to get
> started with this, but it might be worth using a structured list, to allow
> extensions later.
>
> * * *
>
> Nick, I like your proposal. I was briefly worrying about Adam’s suggestion
> to add a fast-path for the replicator, because I use a similar owner model
> for the db-per-user server, and thought I wanted to advocate for a more
> general framework that can be used for various types of jobs (something
> that the Roles variable in the model already allows), but I now agree that
> the importance of making replication as efficient as possible is a
> worthwhile optimisation vs. the other jobs we already have, or could
> imagine running in the future.
>
> Best
> Jan
> —
>
> > On 11. Apr 2019, at 01:51, Robert Newson <rn...@apache.org> wrote:
> >
> > As long as any given replicator node can grab as much work as it can
> handle, It doesn't need to be 'fair' in the way we currently do it. The
> notion of an 'owner' node drops away imo. As nick mentions, the fun part is
> recognising when jobs become unowned due to a resource failure somewhere
> but this is a very standard thing, a pool of workers competing over jobs.
> >
> > --
> >  Robert Samuel Newson
> >  rnewson@apache.org
> >
> > On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
> >> Hi Nick,
> >>
> >> Good stuff. On the first topic, I wonder if it makes sense to use a
> >> dedicated code path for updates to _replicator DB docs, one that would
> >> automatically register these replication jobs in a queue in FDB as part
> >> of the update transaction. That’d save the overhead and latency of
> >> listening to _db_updates and then each _replicator DB’s _changes feed
> >> just to discover these updates (and then presumably create jobs in a
> >> job queue anyway).
> >>
> >> On the second topic — is it important for each node declaring the
> >> replicator role to receive an equal allotment of jobs and manage its
> >> own queue, or can the replication worker processes on each node just
> >> grab the next job off the global queue in FDB whenever they free up? I
> >> could see the latter approach decreasing the tail latency for job
> >> execution, and I think there are good patterns for managing high
> >> contention dequeue operations in the case where we’ve got more worker
> >> processes than jobs to run.
> >>
> >> Regardless, you make a good point about paying special attention to
> >> liveness checking now that we’re not relying on Erlang distribution for
> >> that purpose. I didn’t grok all the details of approach you have in
> >> mind for that yet because I wanted to bring up these two points above
> >> and get your perspective.
> >>
> >> Adam
> >>
> >>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org>
> wrote:
> >>>
> >>> I was thinking how replication would work with FDB and so far there
> are two
> >>> main issues I believe would need to be addressed. One deals with how we
> >>> monitor _replicator db docs for changes, and other one is how
> replication
> >>> jobs coordinate so we don't run multiple replication jobs for the same
> >>> replication document in a cluster.
> >>>
> >>> 1) Shard-level vs fabric-level notifications for _replicator db docs
> >>>
> >>> Currently replicator is monitoring and updating individual _replicator
> >>> shards. Change notifications are done via change feeds (normal,
> >>> non-continuous) and couch event server callbacks.
> >>>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180
> ,
> >>>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246
> .
> >>> With fdb we'd have to get these updates via a fabric changes feeds and
> rely
> >>> on the global _db_updates. That could result in a performance impact
> and
> >>> would be something to keep an eye on.
> >>>
> >>> 2) Replicator job coordination
> >>>
> >>> Replicator has a basic constraint that there should be only one
> replication
> >>> job running for each replicator doc per cluster.
> >>>
> >>> Each replication currently has a single "owner" node. The owner is
> picked
> >>> to be one of 3 nodes were the _replicator doc shards live. If nodes
> connect
> >>> or disconnect, replicator will reshuffle replication jobs and some
> nodes
> >>> will stop running jobs that they don't "own" anymore and then proceed
> to
> >>> "rescan" all the replicator docs to possibly start new ones. However,
> with
> >>> fdb, there are no connected erlang nodes and no shards. All
> coordination
> >>> happens via fdb, so we'd have to somehow coordinate replication job
> >>> ownership through there.
> >>>
> >>> For discussion, here is a proposal for a worker registration layer do
> that
> >>> job coordination:
> >>>
> >>> The basic idea is erlang fabric nodes would declare, by writing to fdb,
> >>> that they can take on certain "roles". "replicator" would be one such
> role.
> >>> And so, for each role there is a list of nodes. Each node picks a
> fraction
> >>> of jobs based on how many other nodes of the same role are in the list.
> >>> When membership changes, nodes which are alive might have to pick up
> new
> >>> jobs or stop running existing jobs since they'd be started by other
> nodes.
> >>>
> >>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3].
> n1 is
> >>> currently down so the membership list is [n2, n3]. If there are 60
> >>> replication jobs then n2 might run 30, and n3 another 30. n1 comes
> online
> >>> and adds itself to the roles list, which now looks like [n1, n2, n3].
> n1
> >>> then picks 20 replication jobs. At about the same time n2 and n3
> notice n1
> >>> is online and decide to stop running the jobs that n1 would pick up and
> >>> they each would end up running roughly 20 jobs.
> >>>
> >>> The difficulty here comes from maintaining liveliness. A node could
> stop at
> >>> any time without removing itself from the membership list of its roles.
> >>> That means all of the sudden a subset of jobs would stop running
> without
> >>> anyone picking them up. So, the idea is to have nodes periodically
> update
> >>> their health status in fdb to indicate they are alive. Once a node
> doesn't
> >>> update its status often enough it will be considered dead and others
> can
> >>> pick up its share of jobs.
> >>>
> >>> To start the discussion, I sketched this data layout and pseudocode:
> >>>
> >>> Data layout:
> >>>
> >>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> >>> ...])
> >>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> >>> Timestamp)
> >>>
> >>> Role : In our case it would be "replicator", but it could be any other
> role.
> >>>
> >>> WId : are workers IDs. These should unique identify workers. It would
> be
> >>> nice if it could be persisted, such that a worker doing a quick restart
> >>> will end up with the same id and the membership list won't change.
> However,
> >>> a random UUID would work as well.
> >>>
> >>> Timeout : This is the timeout declared by the nodes themselves. These
> need
> >>> not be the same for all node. Some nodes might decide they run slower
> so
> >>> their timeouts would be larger. But they essentially promise to update
> >>> their health status at least that often.
> >>>
> >>> Timestamp: The time of the last health report from that node.
> Timestamps
> >>> technically might not be needed as neighbor monitors could remember the
> >>> time delta between when it saw changes to the health values' version
> stamp.
> >>>
> >>> Pseudocode:
> >>>
> >>> init(Role) ->
> >>>   Members = tx(add_to_members(self(), Role)
> >>>   spawn health_ping(Members, Role)
> >>>   spawn neighbor_check(Members, Role)
> >>>   loop()
> >>>
> >>> terminate() ->
> >>>   tx(remove_self_from_members_and_health_list())
> >>>
> >>> loop() ->
> >>>   {Members, Watch} = tx(add_members(self(), Role), get_watch())
> >>>   receive
> >>>   {Watch, NewMembers} ->
> >>>       case diff(Members, NewMembers) of
> >>>       no_diff ->
> >>>           ok;
> >>>       {Added, Removed} ->
> >>>           update_neighbor_check(NewMembers)
> >>>           fire_callbacks(Added, Removed)
> >>>   end,
> >>>   loop()
> >>>
> >>> health_ping(Members, Role) ->
> >>>  tx(update_health(Role, self(), Timestamp))
> >>>  sleep(Timeout / 3)
> >>>  health_ping(Members, Role)
> >>>
> >>> neighbor_check(Members, Role) ->
> >>> Neighbor = next_in_list(self(), Members)
> >>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
> >>> case now() - Timestamp > Timeout of
> >>> true ->
> >>>     NewMembers = Tx(remove_neighbor(Neighbor, Role))
> >>>     neighbor_check(NewMembers, Role)
> >>> false ->
> >>>     sleep(Timeout)
> >>>     neighbor_check(Members, Role)
> >>> end
> >>>
> >>>
> >>> Description:
> >>>
> >>> Nodes add themselves to a membership list for each role they
> participate
> >>> in. The membership list has a version stamp. It's there to ensure that
> the
> >>> watch that is created during the update would find any change occurring
> >>> since their update.
> >>>
> >>> tx(...) is pseudocode for "runs in a transaction"
> >>>
> >>> neighbor_check() is how entries for dead workers are cleaned up. Each
> node
> >>> will monitor its neighbor's status. If it sees the neighbor has stopped
> >>> responding it will remove it from the list. That will update the
> membership
> >>> list and will fire the watch. Everyone will notice and rescan their
> >>> replicator docs.
> >>>
> >>> fire_callbacks() is just reporting to the replicator app that
> membership
> >>> has changed it and might need to rescan. On top of this code currently
> >>> there is a cluster stability logic that waits a bit before rescanning
> in
> >>> case there is a flurry of node membership changes. Like say on rolling
> node
> >>> reboots or cluster startup.
> >>>
> >>> I am not entirely sure on the semantics of watches and how lightweight
> or
> >>> heavyweight they are. Creating a watch and a version stamp will
> hopefully
> >>> not lose updates. That is, all updates after that transaction's watch
> will
> >>> fire the watch. Watches seem to have limits and then I think we'd need
> to
> >>> revert to polling
> >>>
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> >>> which make sense but wondering if we should just start with polling
> first
> >>> and larger poll intervals. I guess it depends on how many other places
> we'd
> >>> have use watches and if we'd ever come close the even needing to handle
> >>> that error case.
> >>>
> >>>
> >>> What does everyone think? The idea would to be turn the proposal from
> 2)
> >>> into an RFC but wanted to open it for a general discussion and see what
> >>> everythone thought about it.
> >>
> >>
>
> --
> Professional Support for Apache CouchDB:
> https://neighbourhood.ie/couchdb-support/
>
>

Re: [DISCUSS] FDB and CouchDB replication

Posted by Jan Lehnardt <ja...@apache.org>.
An advanced implementation could maintain lists of [{node: n1, ratio: 0.2}, {node: n2, ratio: 0.4}, {node: n3, ratio: 0.4}] where nodes or admins can signify the ratio of jobs any given node should be handling. Of course this gets complicated quickly if the sum of ratio is <1 or >1, but I’m sure there are clever ways out of that. It might also be too complicated to get started with this, but it might be worth using a structured list, to allow extensions later.

* * *

Nick, I like your proposal. I was briefly worrying about Adam’s suggestion to add a fast-path for the replicator, because I use a similar owner model for the db-per-user server, and thought I wanted to advocate for a more general framework that can be used for various types of jobs (something that the Roles variable in the model already allows), but I now agree that the importance of making replication as efficient as possible is a worthwhile optimisation vs. the other jobs we already have, or could imagine running in the future.

Best
Jan
—

> On 11. Apr 2019, at 01:51, Robert Newson <rn...@apache.org> wrote:
> 
> As long as any given replicator node can grab as much work as it can handle, It doesn't need to be 'fair' in the way we currently do it. The notion of an 'owner' node drops away imo. As nick mentions, the fun part is recognising when jobs become unowned due to a resource failure somewhere but this is a very standard thing, a pool of workers competing over jobs.
> 
> -- 
>  Robert Samuel Newson
>  rnewson@apache.org
> 
> On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
>> Hi Nick,
>> 
>> Good stuff. On the first topic, I wonder if it makes sense to use a 
>> dedicated code path for updates to _replicator DB docs, one that would 
>> automatically register these replication jobs in a queue in FDB as part 
>> of the update transaction. That’d save the overhead and latency of 
>> listening to _db_updates and then each _replicator DB’s _changes feed 
>> just to discover these updates (and then presumably create jobs in a 
>> job queue anyway).
>> 
>> On the second topic — is it important for each node declaring the 
>> replicator role to receive an equal allotment of jobs and manage its 
>> own queue, or can the replication worker processes on each node just 
>> grab the next job off the global queue in FDB whenever they free up? I 
>> could see the latter approach decreasing the tail latency for job 
>> execution, and I think there are good patterns for managing high 
>> contention dequeue operations in the case where we’ve got more worker 
>> processes than jobs to run.
>> 
>> Regardless, you make a good point about paying special attention to 
>> liveness checking now that we’re not relying on Erlang distribution for 
>> that purpose. I didn’t grok all the details of approach you have in 
>> mind for that yet because I wanted to bring up these two points above 
>> and get your perspective.
>> 
>> Adam
>> 
>>> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org> wrote:
>>> 
>>> I was thinking how replication would work with FDB and so far there are two
>>> main issues I believe would need to be addressed. One deals with how we
>>> monitor _replicator db docs for changes, and other one is how replication
>>> jobs coordinate so we don't run multiple replication jobs for the same
>>> replication document in a cluster.
>>> 
>>> 1) Shard-level vs fabric-level notifications for _replicator db docs
>>> 
>>> Currently replicator is monitoring and updating individual _replicator
>>> shards. Change notifications are done via change feeds (normal,
>>> non-continuous) and couch event server callbacks.
>>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
>>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
>>> With fdb we'd have to get these updates via a fabric changes feeds and rely
>>> on the global _db_updates. That could result in a performance impact and
>>> would be something to keep an eye on.
>>> 
>>> 2) Replicator job coordination
>>> 
>>> Replicator has a basic constraint that there should be only one replication
>>> job running for each replicator doc per cluster.
>>> 
>>> Each replication currently has a single "owner" node. The owner is picked
>>> to be one of 3 nodes were the _replicator doc shards live. If nodes connect
>>> or disconnect, replicator will reshuffle replication jobs and some nodes
>>> will stop running jobs that they don't "own" anymore and then proceed to
>>> "rescan" all the replicator docs to possibly start new ones. However, with
>>> fdb, there are no connected erlang nodes and no shards. All coordination
>>> happens via fdb, so we'd have to somehow coordinate replication job
>>> ownership through there.
>>> 
>>> For discussion, here is a proposal for a worker registration layer do that
>>> job coordination:
>>> 
>>> The basic idea is erlang fabric nodes would declare, by writing to fdb,
>>> that they can take on certain "roles". "replicator" would be one such role.
>>> And so, for each role there is a list of nodes. Each node picks a fraction
>>> of jobs based on how many other nodes of the same role are in the list.
>>> When membership changes, nodes which are alive might have to pick up new
>>> jobs or stop running existing jobs since they'd be started by other nodes.
>>> 
>>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is
>>> currently down so the membership list is [n2, n3]. If there are 60
>>> replication jobs then n2 might run 30, and n3 another 30. n1 comes online
>>> and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
>>> then picks 20 replication jobs. At about the same time n2 and n3 notice n1
>>> is online and decide to stop running the jobs that n1 would pick up and
>>> they each would end up running roughly 20 jobs.
>>> 
>>> The difficulty here comes from maintaining liveliness. A node could stop at
>>> any time without removing itself from the membership list of its roles.
>>> That means all of the sudden a subset of jobs would stop running without
>>> anyone picking them up. So, the idea is to have nodes periodically update
>>> their health status in fdb to indicate they are alive. Once a node doesn't
>>> update its status often enough it will be considered dead and others can
>>> pick up its share of jobs.
>>> 
>>> To start the discussion, I sketched this data layout and pseudocode:
>>> 
>>> Data layout:
>>> 
>>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
>>> ...])
>>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
>>> Timestamp)
>>> 
>>> Role : In our case it would be "replicator", but it could be any other role.
>>> 
>>> WId : are workers IDs. These should unique identify workers. It would be
>>> nice if it could be persisted, such that a worker doing a quick restart
>>> will end up with the same id and the membership list won't change. However,
>>> a random UUID would work as well.
>>> 
>>> Timeout : This is the timeout declared by the nodes themselves. These need
>>> not be the same for all node. Some nodes might decide they run slower so
>>> their timeouts would be larger. But they essentially promise to update
>>> their health status at least that often.
>>> 
>>> Timestamp: The time of the last health report from that node. Timestamps
>>> technically might not be needed as neighbor monitors could remember the
>>> time delta between when it saw changes to the health values' version stamp.
>>> 
>>> Pseudocode:
>>> 
>>> init(Role) ->
>>>   Members = tx(add_to_members(self(), Role)
>>>   spawn health_ping(Members, Role)
>>>   spawn neighbor_check(Members, Role)
>>>   loop()
>>> 
>>> terminate() ->
>>>   tx(remove_self_from_members_and_health_list())
>>> 
>>> loop() ->
>>>   {Members, Watch} = tx(add_members(self(), Role), get_watch())
>>>   receive
>>>   {Watch, NewMembers} ->
>>>       case diff(Members, NewMembers) of
>>>       no_diff ->
>>>           ok;
>>>       {Added, Removed} ->
>>>           update_neighbor_check(NewMembers)
>>>           fire_callbacks(Added, Removed)
>>>   end,
>>>   loop()
>>> 
>>> health_ping(Members, Role) ->
>>>  tx(update_health(Role, self(), Timestamp))
>>>  sleep(Timeout / 3)
>>>  health_ping(Members, Role)
>>> 
>>> neighbor_check(Members, Role) ->
>>> Neighbor = next_in_list(self(), Members)
>>> {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
>>> case now() - Timestamp > Timeout of
>>> true ->
>>>     NewMembers = Tx(remove_neighbor(Neighbor, Role))
>>>     neighbor_check(NewMembers, Role)
>>> false ->
>>>     sleep(Timeout)
>>>     neighbor_check(Members, Role)
>>> end
>>> 
>>> 
>>> Description:
>>> 
>>> Nodes add themselves to a membership list for each role they participate
>>> in. The membership list has a version stamp. It's there to ensure that the
>>> watch that is created during the update would find any change occurring
>>> since their update.
>>> 
>>> tx(...) is pseudocode for "runs in a transaction"
>>> 
>>> neighbor_check() is how entries for dead workers are cleaned up. Each node
>>> will monitor its neighbor's status. If it sees the neighbor has stopped
>>> responding it will remove it from the list. That will update the membership
>>> list and will fire the watch. Everyone will notice and rescan their
>>> replicator docs.
>>> 
>>> fire_callbacks() is just reporting to the replicator app that membership
>>> has changed it and might need to rescan. On top of this code currently
>>> there is a cluster stability logic that waits a bit before rescanning in
>>> case there is a flurry of node membership changes. Like say on rolling node
>>> reboots or cluster startup.
>>> 
>>> I am not entirely sure on the semantics of watches and how lightweight or
>>> heavyweight they are. Creating a watch and a version stamp will hopefully
>>> not lose updates. That is, all updates after that transaction's watch will
>>> fire the watch. Watches seem to have limits and then I think we'd need to
>>> revert to polling
>>> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
>>> which make sense but wondering if we should just start with polling first
>>> and larger poll intervals. I guess it depends on how many other places we'd
>>> have use watches and if we'd ever come close the even needing to handle
>>> that error case.
>>> 
>>> 
>>> What does everyone think? The idea would to be turn the proposal from 2)
>>> into an RFC but wanted to open it for a general discussion and see what
>>> everythone thought about it.
>> 
>> 

-- 
Professional Support for Apache CouchDB:
https://neighbourhood.ie/couchdb-support/


Re: [DISCUSS] FDB and CouchDB replication

Posted by Robert Newson <rn...@apache.org>.
As long as any given replicator node can grab as much work as it can handle, It doesn't need to be 'fair' in the way we currently do it. The notion of an 'owner' node drops away imo. As nick mentions, the fun part is recognising when jobs become unowned due to a resource failure somewhere but this is a very standard thing, a pool of workers competing over jobs.

-- 
  Robert Samuel Newson
  rnewson@apache.org

On Thu, 11 Apr 2019, at 00:43, Adam Kocoloski wrote:
> Hi Nick,
> 
> Good stuff. On the first topic, I wonder if it makes sense to use a 
> dedicated code path for updates to _replicator DB docs, one that would 
> automatically register these replication jobs in a queue in FDB as part 
> of the update transaction. That’d save the overhead and latency of 
> listening to _db_updates and then each _replicator DB’s _changes feed 
> just to discover these updates (and then presumably create jobs in a 
> job queue anyway).
> 
> On the second topic — is it important for each node declaring the 
> replicator role to receive an equal allotment of jobs and manage its 
> own queue, or can the replication worker processes on each node just 
> grab the next job off the global queue in FDB whenever they free up? I 
> could see the latter approach decreasing the tail latency for job 
> execution, and I think there are good patterns for managing high 
> contention dequeue operations in the case where we’ve got more worker 
> processes than jobs to run.
> 
> Regardless, you make a good point about paying special attention to 
> liveness checking now that we’re not relying on Erlang distribution for 
> that purpose. I didn’t grok all the details of approach you have in 
> mind for that yet because I wanted to bring up these two points above 
> and get your perspective.
> 
> Adam
> 
> > On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org> wrote:
> > 
> > I was thinking how replication would work with FDB and so far there are two
> > main issues I believe would need to be addressed. One deals with how we
> > monitor _replicator db docs for changes, and other one is how replication
> > jobs coordinate so we don't run multiple replication jobs for the same
> > replication document in a cluster.
> > 
> > 1) Shard-level vs fabric-level notifications for _replicator db docs
> > 
> > Currently replicator is monitoring and updating individual _replicator
> > shards. Change notifications are done via change feeds (normal,
> > non-continuous) and couch event server callbacks.
> > https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
> > https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
> > With fdb we'd have to get these updates via a fabric changes feeds and rely
> > on the global _db_updates. That could result in a performance impact and
> > would be something to keep an eye on.
> > 
> > 2) Replicator job coordination
> > 
> > Replicator has a basic constraint that there should be only one replication
> > job running for each replicator doc per cluster.
> > 
> > Each replication currently has a single "owner" node. The owner is picked
> > to be one of 3 nodes were the _replicator doc shards live. If nodes connect
> > or disconnect, replicator will reshuffle replication jobs and some nodes
> > will stop running jobs that they don't "own" anymore and then proceed to
> > "rescan" all the replicator docs to possibly start new ones. However, with
> > fdb, there are no connected erlang nodes and no shards. All coordination
> > happens via fdb, so we'd have to somehow coordinate replication job
> > ownership through there.
> > 
> > For discussion, here is a proposal for a worker registration layer do that
> > job coordination:
> > 
> > The basic idea is erlang fabric nodes would declare, by writing to fdb,
> > that they can take on certain "roles". "replicator" would be one such role.
> > And so, for each role there is a list of nodes. Each node picks a fraction
> > of jobs based on how many other nodes of the same role are in the list.
> > When membership changes, nodes which are alive might have to pick up new
> > jobs or stop running existing jobs since they'd be started by other nodes.
> > 
> > For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is
> > currently down so the membership list is [n2, n3]. If there are 60
> > replication jobs then n2 might run 30, and n3 another 30. n1 comes online
> > and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
> > then picks 20 replication jobs. At about the same time n2 and n3 notice n1
> > is online and decide to stop running the jobs that n1 would pick up and
> > they each would end up running roughly 20 jobs.
> > 
> > The difficulty here comes from maintaining liveliness. A node could stop at
> > any time without removing itself from the membership list of its roles.
> > That means all of the sudden a subset of jobs would stop running without
> > anyone picking them up. So, the idea is to have nodes periodically update
> > their health status in fdb to indicate they are alive. Once a node doesn't
> > update its status often enough it will be considered dead and others can
> > pick up its share of jobs.
> > 
> > To start the discussion, I sketched this data layout and pseudocode:
> > 
> > Data layout:
> > 
> > ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> > ...])
> > ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> > Timestamp)
> > 
> > Role : In our case it would be "replicator", but it could be any other role.
> > 
> > WId : are workers IDs. These should unique identify workers. It would be
> > nice if it could be persisted, such that a worker doing a quick restart
> > will end up with the same id and the membership list won't change. However,
> > a random UUID would work as well.
> > 
> > Timeout : This is the timeout declared by the nodes themselves. These need
> > not be the same for all node. Some nodes might decide they run slower so
> > their timeouts would be larger. But they essentially promise to update
> > their health status at least that often.
> > 
> > Timestamp: The time of the last health report from that node. Timestamps
> > technically might not be needed as neighbor monitors could remember the
> > time delta between when it saw changes to the health values' version stamp.
> > 
> > Pseudocode:
> > 
> > init(Role) ->
> >    Members = tx(add_to_members(self(), Role)
> >    spawn health_ping(Members, Role)
> >    spawn neighbor_check(Members, Role)
> >    loop()
> > 
> > terminate() ->
> >    tx(remove_self_from_members_and_health_list())
> > 
> > loop() ->
> >    {Members, Watch} = tx(add_members(self(), Role), get_watch())
> >    receive
> >    {Watch, NewMembers} ->
> >        case diff(Members, NewMembers) of
> >        no_diff ->
> >            ok;
> >        {Added, Removed} ->
> >            update_neighbor_check(NewMembers)
> >            fire_callbacks(Added, Removed)
> >    end,
> >    loop()
> > 
> > health_ping(Members, Role) ->
> >   tx(update_health(Role, self(), Timestamp))
> >   sleep(Timeout / 3)
> >   health_ping(Members, Role)
> > 
> > neighbor_check(Members, Role) ->
> >  Neighbor = next_in_list(self(), Members)
> >  {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
> >  case now() - Timestamp > Timeout of
> >  true ->
> >      NewMembers = Tx(remove_neighbor(Neighbor, Role))
> >      neighbor_check(NewMembers, Role)
> >  false ->
> >      sleep(Timeout)
> >      neighbor_check(Members, Role)
> >  end
> > 
> > 
> > Description:
> > 
> > Nodes add themselves to a membership list for each role they participate
> > in. The membership list has a version stamp. It's there to ensure that the
> > watch that is created during the update would find any change occurring
> > since their update.
> > 
> > tx(...) is pseudocode for "runs in a transaction"
> > 
> > neighbor_check() is how entries for dead workers are cleaned up. Each node
> > will monitor its neighbor's status. If it sees the neighbor has stopped
> > responding it will remove it from the list. That will update the membership
> > list and will fire the watch. Everyone will notice and rescan their
> > replicator docs.
> > 
> > fire_callbacks() is just reporting to the replicator app that membership
> > has changed it and might need to rescan. On top of this code currently
> > there is a cluster stability logic that waits a bit before rescanning in
> > case there is a flurry of node membership changes. Like say on rolling node
> > reboots or cluster startup.
> > 
> > I am not entirely sure on the semantics of watches and how lightweight or
> > heavyweight they are. Creating a watch and a version stamp will hopefully
> > not lose updates. That is, all updates after that transaction's watch will
> > fire the watch. Watches seem to have limits and then I think we'd need to
> > revert to polling
> > https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> > which make sense but wondering if we should just start with polling first
> > and larger poll intervals. I guess it depends on how many other places we'd
> > have use watches and if we'd ever come close the even needing to handle
> > that error case.
> > 
> > 
> > What does everyone think? The idea would to be turn the proposal from 2)
> > into an RFC but wanted to open it for a general discussion and see what
> > everythone thought about it.
> 
>

Re: [DISCUSS] FDB and CouchDB replication

Posted by Adam Kocoloski <ko...@apache.org>.
Hi Nick,

Good stuff. On the first topic, I wonder if it makes sense to use a dedicated code path for updates to _replicator DB docs, one that would automatically register these replication jobs in a queue in FDB as part of the update transaction. That’d save the overhead and latency of listening to _db_updates and then each _replicator DB’s _changes feed just to discover these updates (and then presumably create jobs in a job queue anyway).

On the second topic — is it important for each node declaring the replicator role to receive an equal allotment of jobs and manage its own queue, or can the replication worker processes on each node just grab the next job off the global queue in FDB whenever they free up? I could see the latter approach decreasing the tail latency for job execution, and I think there are good patterns for managing high contention dequeue operations in the case where we’ve got more worker processes than jobs to run.

Regardless, you make a good point about paying special attention to liveness checking now that we’re not relying on Erlang distribution for that purpose. I didn’t grok all the details of approach you have in mind for that yet because I wanted to bring up these two points above and get your perspective.

Adam

> On Apr 10, 2019, at 6:21 PM, Nick Vatamaniuc <va...@apache.org> wrote:
> 
> I was thinking how replication would work with FDB and so far there are two
> main issues I believe would need to be addressed. One deals with how we
> monitor _replicator db docs for changes, and other one is how replication
> jobs coordinate so we don't run multiple replication jobs for the same
> replication document in a cluster.
> 
> 1) Shard-level vs fabric-level notifications for _replicator db docs
> 
> Currently replicator is monitoring and updating individual _replicator
> shards. Change notifications are done via change feeds (normal,
> non-continuous) and couch event server callbacks.
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
> With fdb we'd have to get these updates via a fabric changes feeds and rely
> on the global _db_updates. That could result in a performance impact and
> would be something to keep an eye on.
> 
> 2) Replicator job coordination
> 
> Replicator has a basic constraint that there should be only one replication
> job running for each replicator doc per cluster.
> 
> Each replication currently has a single "owner" node. The owner is picked
> to be one of 3 nodes were the _replicator doc shards live. If nodes connect
> or disconnect, replicator will reshuffle replication jobs and some nodes
> will stop running jobs that they don't "own" anymore and then proceed to
> "rescan" all the replicator docs to possibly start new ones. However, with
> fdb, there are no connected erlang nodes and no shards. All coordination
> happens via fdb, so we'd have to somehow coordinate replication job
> ownership through there.
> 
> For discussion, here is a proposal for a worker registration layer do that
> job coordination:
> 
> The basic idea is erlang fabric nodes would declare, by writing to fdb,
> that they can take on certain "roles". "replicator" would be one such role.
> And so, for each role there is a list of nodes. Each node picks a fraction
> of jobs based on how many other nodes of the same role are in the list.
> When membership changes, nodes which are alive might have to pick up new
> jobs or stop running existing jobs since they'd be started by other nodes.
> 
> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1 is
> currently down so the membership list is [n2, n3]. If there are 60
> replication jobs then n2 might run 30, and n3 another 30. n1 comes online
> and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
> then picks 20 replication jobs. At about the same time n2 and n3 notice n1
> is online and decide to stop running the jobs that n1 would pick up and
> they each would end up running roughly 20 jobs.
> 
> The difficulty here comes from maintaining liveliness. A node could stop at
> any time without removing itself from the membership list of its roles.
> That means all of the sudden a subset of jobs would stop running without
> anyone picking them up. So, the idea is to have nodes periodically update
> their health status in fdb to indicate they are alive. Once a node doesn't
> update its status often enough it will be considered dead and others can
> pick up its share of jobs.
> 
> To start the discussion, I sketched this data layout and pseudocode:
> 
> Data layout:
> 
> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> ...])
> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> Timestamp)
> 
> Role : In our case it would be "replicator", but it could be any other role.
> 
> WId : are workers IDs. These should unique identify workers. It would be
> nice if it could be persisted, such that a worker doing a quick restart
> will end up with the same id and the membership list won't change. However,
> a random UUID would work as well.
> 
> Timeout : This is the timeout declared by the nodes themselves. These need
> not be the same for all node. Some nodes might decide they run slower so
> their timeouts would be larger. But they essentially promise to update
> their health status at least that often.
> 
> Timestamp: The time of the last health report from that node. Timestamps
> technically might not be needed as neighbor monitors could remember the
> time delta between when it saw changes to the health values' version stamp.
> 
> Pseudocode:
> 
> init(Role) ->
>    Members = tx(add_to_members(self(), Role)
>    spawn health_ping(Members, Role)
>    spawn neighbor_check(Members, Role)
>    loop()
> 
> terminate() ->
>    tx(remove_self_from_members_and_health_list())
> 
> loop() ->
>    {Members, Watch} = tx(add_members(self(), Role), get_watch())
>    receive
>    {Watch, NewMembers} ->
>        case diff(Members, NewMembers) of
>        no_diff ->
>            ok;
>        {Added, Removed} ->
>            update_neighbor_check(NewMembers)
>            fire_callbacks(Added, Removed)
>    end,
>    loop()
> 
> health_ping(Members, Role) ->
>   tx(update_health(Role, self(), Timestamp))
>   sleep(Timeout / 3)
>   health_ping(Members, Role)
> 
> neighbor_check(Members, Role) ->
>  Neighbor = next_in_list(self(), Members)
>  {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
>  case now() - Timestamp > Timeout of
>  true ->
>      NewMembers = Tx(remove_neighbor(Neighbor, Role))
>      neighbor_check(NewMembers, Role)
>  false ->
>      sleep(Timeout)
>      neighbor_check(Members, Role)
>  end
> 
> 
> Description:
> 
> Nodes add themselves to a membership list for each role they participate
> in. The membership list has a version stamp. It's there to ensure that the
> watch that is created during the update would find any change occurring
> since their update.
> 
> tx(...) is pseudocode for "runs in a transaction"
> 
> neighbor_check() is how entries for dead workers are cleaned up. Each node
> will monitor its neighbor's status. If it sees the neighbor has stopped
> responding it will remove it from the list. That will update the membership
> list and will fire the watch. Everyone will notice and rescan their
> replicator docs.
> 
> fire_callbacks() is just reporting to the replicator app that membership
> has changed it and might need to rescan. On top of this code currently
> there is a cluster stability logic that waits a bit before rescanning in
> case there is a flurry of node membership changes. Like say on rolling node
> reboots or cluster startup.
> 
> I am not entirely sure on the semantics of watches and how lightweight or
> heavyweight they are. Creating a watch and a version stamp will hopefully
> not lose updates. That is, all updates after that transaction's watch will
> fire the watch. Watches seem to have limits and then I think we'd need to
> revert to polling
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> which make sense but wondering if we should just start with polling first
> and larger poll intervals. I guess it depends on how many other places we'd
> have use watches and if we'd ever come close the even needing to handle
> that error case.
> 
> 
> What does everyone think? The idea would to be turn the proposal from 2)
> into an RFC but wanted to open it for a general discussion and see what
> everythone thought about it.


Re: [DISCUSS] FDB and CouchDB replication

Posted by Nick Vatamaniuc <va...@gmail.com>.
Hi all,

A quick note, I created an RFC related to this
https://github.com/apache/couchdb-documentation/pull/409/files

Thanks for everyone who participated: Garren, Adam, Paul, Robert, Jan and
others.

On Fri, Apr 12, 2019 at 6:20 PM Nick Vatamaniuc <va...@gmail.com> wrote:

> I had realized one more thing (that would be #3) that's needed, and that
> is handling of _active_tasks, _scheduler/docs, _scheduler/jobs, and POSTs
> to _replicate. Basically anything in chttpd_misc that ends up calling
> across a cluster (rpc:call, rpc:multicall, gen_server:multi_call, etc).
>
> How the plumbing will look there depends on the shape of the background
> jobs / tasks queue feature we've been discussing. Maybe having one status
> table per job role, or per worker...? The fundamental difference is that
> now that info lives in ETS tables in memory and it would have to move to
> FDB. Which will hopefully make it nicer and easier to handle.
>
> And I think that also opens the possibility of persisting job / tasks
> intermediate state between executions. For replications it might not matter
> as they resume from the last checkpoint but other jobs might use that
> option.
>
>
> On Wed, Apr 10, 2019 at 6:21 PM Nick Vatamaniuc <va...@apache.org>
> wrote:
>
>>
>> I was thinking how replication would work with FDB and so far there are
>> two main issues I believe would need to be addressed. One deals with how we
>> monitor _replicator db docs for changes, and other one is how replication
>> jobs coordinate so we don't run multiple replication jobs for the same
>> replication document in a cluster.
>>
>> 1) Shard-level vs fabric-level notifications for _replicator db docs
>>
>> Currently replicator is monitoring and updating individual _replicator
>> shards. Change notifications are done via change feeds (normal,
>> non-continuous) and couch event server callbacks.
>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
>>
>> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
>> With fdb we'd have to get these updates via a fabric changes feeds and rely
>> on the global _db_updates. That could result in a performance impact and
>> would be something to keep an eye on.
>>
>> 2) Replicator job coordination
>>
>> Replicator has a basic constraint that there should be only one
>> replication job running for each replicator doc per cluster.
>>
>> Each replication currently has a single "owner" node. The owner is picked
>> to be one of 3 nodes were the _replicator doc shards live. If nodes connect
>> or disconnect, replicator will reshuffle replication jobs and some nodes
>> will stop running jobs that they don't "own" anymore and then proceed to
>> "rescan" all the replicator docs to possibly start new ones. However, with
>> fdb, there are no connected erlang nodes and no shards. All coordination
>> happens via fdb, so we'd have to somehow coordinate replication job
>> ownership through there.
>>
>> For discussion, here is a proposal for a worker registration layer do
>> that job coordination:
>>
>> The basic idea is erlang fabric nodes would declare, by writing to fdb,
>> that they can take on certain "roles". "replicator" would be one such role.
>> And so, for each role there is a list of nodes. Each node picks a fraction
>> of jobs based on how many other nodes of the same role are in the list.
>> When membership changes, nodes which are alive might have to pick up new
>> jobs or stop running existing jobs since they'd be started by other nodes.
>>
>> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1
>> is currently down so the membership list is [n2, n3]. If there are 60
>> replication jobs then n2 might run 30, and n3 another 30. n1 comes online
>> and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
>> then picks 20 replication jobs. At about the same time n2 and n3 notice n1
>> is online and decide to stop running the jobs that n1 would pick up and
>> they each would end up running roughly 20 jobs.
>>
>> The difficulty here comes from maintaining liveliness. A node could stop
>> at any time without removing itself from the membership list of its roles.
>> That means all of the sudden a subset of jobs would stop running without
>> anyone picking them up. So, the idea is to have nodes periodically update
>> their health status in fdb to indicate they are alive. Once a node doesn't
>> update its status often enough it will be considered dead and others can
>> pick up its share of jobs.
>>
>> To start the discussion, I sketched this data layout and pseudocode:
>>
>> Data layout:
>>
>> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
>> ...])
>> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
>> Timestamp)
>>
>> Role : In our case it would be "replicator", but it could be any other
>> role.
>>
>> WId : are workers IDs. These should unique identify workers. It would be
>> nice if it could be persisted, such that a worker doing a quick restart
>> will end up with the same id and the membership list won't change. However,
>> a random UUID would work as well.
>>
>> Timeout : This is the timeout declared by the nodes themselves. These
>> need not be the same for all node. Some nodes might decide they run slower
>> so their timeouts would be larger. But they essentially promise to update
>> their health status at least that often.
>>
>> Timestamp: The time of the last health report from that node. Timestamps
>> technically might not be needed as neighbor monitors could remember the
>> time delta between when it saw changes to the health values' version stamp.
>>
>> Pseudocode:
>>
>> init(Role) ->
>>     Members = tx(add_to_members(self(), Role)
>>     spawn health_ping(Members, Role)
>>     spawn neighbor_check(Members, Role)
>>     loop()
>>
>> terminate() ->
>>     tx(remove_self_from_members_and_health_list())
>>
>> loop() ->
>>     {Members, Watch} = tx(add_members(self(), Role), get_watch())
>>     receive
>>     {Watch, NewMembers} ->
>>         case diff(Members, NewMembers) of
>>         no_diff ->
>>             ok;
>>         {Added, Removed} ->
>>             update_neighbor_check(NewMembers)
>>             fire_callbacks(Added, Removed)
>>     end,
>>     loop()
>>
>> health_ping(Members, Role) ->
>>    tx(update_health(Role, self(), Timestamp))
>>    sleep(Timeout / 3)
>>    health_ping(Members, Role)
>>
>> neighbor_check(Members, Role) ->
>>   Neighbor = next_in_list(self(), Members)
>>   {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
>>   case now() - Timestamp > Timeout of
>>   true ->
>>       NewMembers = Tx(remove_neighbor(Neighbor, Role))
>>       neighbor_check(NewMembers, Role)
>>   false ->
>>       sleep(Timeout)
>>       neighbor_check(Members, Role)
>>   end
>>
>>
>> Description:
>>
>> Nodes add themselves to a membership list for each role they participate
>> in. The membership list has a version stamp. It's there to ensure that the
>> watch that is created during the update would find any change occurring
>> since their update.
>>
>> tx(...) is pseudocode for "runs in a transaction"
>>
>> neighbor_check() is how entries for dead workers are cleaned up. Each
>> node will monitor its neighbor's status. If it sees the neighbor has
>> stopped responding it will remove it from the list. That will update the
>> membership list and will fire the watch. Everyone will notice and rescan
>> their replicator docs.
>>
>> fire_callbacks() is just reporting to the replicator app that membership
>> has changed it and might need to rescan. On top of this code currently
>> there is a cluster stability logic that waits a bit before rescanning in
>> case there is a flurry of node membership changes. Like say on rolling node
>> reboots or cluster startup.
>>
>> I am not entirely sure on the semantics of watches and how lightweight or
>> heavyweight they are. Creating a watch and a version stamp will hopefully
>> not lose updates. That is, all updates after that transaction's watch will
>> fire the watch. Watches seem to have limits and then I think we'd need to
>> revert to polling
>> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
>> which make sense but wondering if we should just start with polling first
>> and larger poll intervals. I guess it depends on how many other places we'd
>> have use watches and if we'd ever come close the even needing to handle
>> that error case.
>>
>>
>> What does everyone think? The idea would to be turn the proposal from 2)
>> into an RFC but wanted to open it for a general discussion and see what
>> everythone thought about it.
>>
>>

Re: [DISCUSS] FDB and CouchDB replication

Posted by Nick Vatamaniuc <va...@gmail.com>.
I had realized one more thing (that would be #3) that's needed, and that is
handling of _active_tasks, _scheduler/docs, _scheduler/jobs, and POSTs to
_replicate. Basically anything in chttpd_misc that ends up calling across a
cluster (rpc:call, rpc:multicall, gen_server:multi_call, etc).

How the plumbing will look there depends on the shape of the background
jobs / tasks queue feature we've been discussing. Maybe having one status
table per job role, or per worker...? The fundamental difference is that
now that info lives in ETS tables in memory and it would have to move to
FDB. Which will hopefully make it nicer and easier to handle.

And I think that also opens the possibility of persisting job / tasks
intermediate state between executions. For replications it might not matter
as they resume from the last checkpoint but other jobs might use that
option.


On Wed, Apr 10, 2019 at 6:21 PM Nick Vatamaniuc <va...@apache.org> wrote:

>
> I was thinking how replication would work with FDB and so far there are
> two main issues I believe would need to be addressed. One deals with how we
> monitor _replicator db docs for changes, and other one is how replication
> jobs coordinate so we don't run multiple replication jobs for the same
> replication document in a cluster.
>
> 1) Shard-level vs fabric-level notifications for _replicator db docs
>
> Currently replicator is monitoring and updating individual _replicator
> shards. Change notifications are done via change feeds (normal,
> non-continuous) and couch event server callbacks.
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L180,
>
> https://github.com/apache/couchdb/blob/master/src/couch/src/couch_multidb_changes.erl#L246.
> With fdb we'd have to get these updates via a fabric changes feeds and rely
> on the global _db_updates. That could result in a performance impact and
> would be something to keep an eye on.
>
> 2) Replicator job coordination
>
> Replicator has a basic constraint that there should be only one
> replication job running for each replicator doc per cluster.
>
> Each replication currently has a single "owner" node. The owner is picked
> to be one of 3 nodes were the _replicator doc shards live. If nodes connect
> or disconnect, replicator will reshuffle replication jobs and some nodes
> will stop running jobs that they don't "own" anymore and then proceed to
> "rescan" all the replicator docs to possibly start new ones. However, with
> fdb, there are no connected erlang nodes and no shards. All coordination
> happens via fdb, so we'd have to somehow coordinate replication job
> ownership through there.
>
> For discussion, here is a proposal for a worker registration layer do that
> job coordination:
>
> The basic idea is erlang fabric nodes would declare, by writing to fdb,
> that they can take on certain "roles". "replicator" would be one such role.
> And so, for each role there is a list of nodes. Each node picks a fraction
> of jobs based on how many other nodes of the same role are in the list.
> When membership changes, nodes which are alive might have to pick up new
> jobs or stop running existing jobs since they'd be started by other nodes.
>
> For example, there are 3 nodes with "replicator" roles: [n1, n2, n3]. n1
> is currently down so the membership list is [n2, n3]. If there are 60
> replication jobs then n2 might run 30, and n3 another 30. n1 comes online
> and adds itself to the roles list, which now looks like [n1, n2, n3]. n1
> then picks 20 replication jobs. At about the same time n2 and n3 notice n1
> is online and decide to stop running the jobs that n1 would pick up and
> they each would end up running roughly 20 jobs.
>
> The difficulty here comes from maintaining liveliness. A node could stop
> at any time without removing itself from the membership list of its roles.
> That means all of the sudden a subset of jobs would stop running without
> anyone picking them up. So, the idea is to have nodes periodically update
> their health status in fdb to indicate they are alive. Once a node doesn't
> update its status often enough it will be considered dead and others can
> pick up its share of jobs.
>
> To start the discussion, I sketched this data layout and pseudocode:
>
> Data layout:
>
> ("couch_workers", Role, "members") = (MembersVersionStamp, [WId1, WId2,
> ...])
> ("couch_workers", Role, "health", WId) = (WorkerVersionStamp, Timeout,
> Timestamp)
>
> Role : In our case it would be "replicator", but it could be any other
> role.
>
> WId : are workers IDs. These should unique identify workers. It would be
> nice if it could be persisted, such that a worker doing a quick restart
> will end up with the same id and the membership list won't change. However,
> a random UUID would work as well.
>
> Timeout : This is the timeout declared by the nodes themselves. These need
> not be the same for all node. Some nodes might decide they run slower so
> their timeouts would be larger. But they essentially promise to update
> their health status at least that often.
>
> Timestamp: The time of the last health report from that node. Timestamps
> technically might not be needed as neighbor monitors could remember the
> time delta between when it saw changes to the health values' version stamp.
>
> Pseudocode:
>
> init(Role) ->
>     Members = tx(add_to_members(self(), Role)
>     spawn health_ping(Members, Role)
>     spawn neighbor_check(Members, Role)
>     loop()
>
> terminate() ->
>     tx(remove_self_from_members_and_health_list())
>
> loop() ->
>     {Members, Watch} = tx(add_members(self(), Role), get_watch())
>     receive
>     {Watch, NewMembers} ->
>         case diff(Members, NewMembers) of
>         no_diff ->
>             ok;
>         {Added, Removed} ->
>             update_neighbor_check(NewMembers)
>             fire_callbacks(Added, Removed)
>     end,
>     loop()
>
> health_ping(Members, Role) ->
>    tx(update_health(Role, self(), Timestamp))
>    sleep(Timeout / 3)
>    health_ping(Members, Role)
>
> neighbor_check(Members, Role) ->
>   Neighbor = next_in_list(self(), Members)
>   {Timeout, Timestamp} = tx(read_timestamp(Neighbor, Role))
>   case now() - Timestamp > Timeout of
>   true ->
>       NewMembers = Tx(remove_neighbor(Neighbor, Role))
>       neighbor_check(NewMembers, Role)
>   false ->
>       sleep(Timeout)
>       neighbor_check(Members, Role)
>   end
>
>
> Description:
>
> Nodes add themselves to a membership list for each role they participate
> in. The membership list has a version stamp. It's there to ensure that the
> watch that is created during the update would find any change occurring
> since their update.
>
> tx(...) is pseudocode for "runs in a transaction"
>
> neighbor_check() is how entries for dead workers are cleaned up. Each node
> will monitor its neighbor's status. If it sees the neighbor has stopped
> responding it will remove it from the list. That will update the membership
> list and will fire the watch. Everyone will notice and rescan their
> replicator docs.
>
> fire_callbacks() is just reporting to the replicator app that membership
> has changed it and might need to rescan. On top of this code currently
> there is a cluster stability logic that waits a bit before rescanning in
> case there is a flurry of node membership changes. Like say on rolling node
> reboots or cluster startup.
>
> I am not entirely sure on the semantics of watches and how lightweight or
> heavyweight they are. Creating a watch and a version stamp will hopefully
> not lose updates. That is, all updates after that transaction's watch will
> fire the watch. Watches seem to have limits and then I think we'd need to
> revert to polling
> https://github.com/apple/foundationdb/blob/8472f1046957849a97538abb2f47b299e0ae2b2d/fdbserver/storageserver.actor.cpp#L789
> which make sense but wondering if we should just start with polling first
> and larger poll intervals. I guess it depends on how many other places we'd
> have use watches and if we'd ever come close the even needing to handle
> that error case.
>
>
> What does everyone think? The idea would to be turn the proposal from 2)
> into an RFC but wanted to open it for a general discussion and see what
> everythone thought about it.
>
>