You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "Andrew Wong (Jira)" <ji...@apache.org> on 2021/08/05 00:31:00 UTC

[jira] [Commented] (KUDU-3290) Implement Replicate table's data to Kafka(or other Storage System)

    [ https://issues.apache.org/jira/browse/KUDU-3290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393526#comment-17393526 ] 

Andrew Wong commented on KUDU-3290:
-----------------------------------

Sorry for the late reply. I do think between the two, the learner replica seems more palatable, given it leaves the flexibility of decoupling the IO from the rest of the cluster (e.g. if we put all learners in a single tablet server). That said, it also increases the amount of IO done, given we have to replicate to an extra node. Maybe that's fine though.

{quote}
we can trigger a full scan at the timestamp and replicate data to learner, and then recover the appendEntries flow
{quote}

I'm not sure I understand this part, but I think you're referring to the conceptual equivalent of performing a tablet copy. When there aren't enough WALs in the leader to catch up a replica, the leader sends a tablet copy request to the follower, and the follower is "caught up" via a tablet copy, and then the tablet scanned and sent to Kafka. Is that right?

In this case, for the remote learner, I wonder if in addition to a regular tablet copy to the local learner, there is room here to also rely on the incremental scan developed for backups. If the learner knows what index it has replicated to Kafka, it should also be able to keep track of the timestamp associated with that OpId. If so, Kudu should be able to perform a differential scan between that timestamp and the latest timestamp in the newly copied replica. Of course, if the retention window is too short, this wouldn't work.

Also, in your proposal you mentioned replicas and leaders keeping track of more state for the sake of catching up the external service. If so, it'd be great if you could clarify exactly what state we would need (most recently replicated OpId? maybe its timestamp? anything else?) and where that state would be stored (with consensus metadata? somewhere else?).

> Implement Replicate table's data to Kafka(or other Storage System)
> ------------------------------------------------------------------
>
>                 Key: KUDU-3290
>                 URL: https://issues.apache.org/jira/browse/KUDU-3290
>             Project: Kudu
>          Issue Type: New Feature
>          Components: tserver
>            Reporter: shenxingwuying
>            Priority: Critical
>
> h1. background & problem
> We use kudu to store the user profile data, because business requirements, exchange and share data from multi-tenant users, which is reasonable in our application scene, we need replicate data from one system to another. The destination storage system we pick kafka, because of our company's architecture at now.
> At this time, we have two ideas to solve it.
> h1. two replication scheme
> Generally, Raft group has three replicas, one is leader and the other two are followers. We’ll add a replica, its role is Learner. Learner only receive all the data, but not pariticipart in ther leadership election.
> The learner replica, its state machine will be a plugin system, eg:
>  # We can support KuduEngine, which just a data backup like mongodb’s hidden replica.
>  # We can write to the thirdparty store system, like kafka or any other system we need. Then we can replicate data to another system use its client.
> At Paxos has a learner role, which only receive data. we need such a role for new membership.
> But it Kudu Learner has been used for the copying(recovering) tablet replica. Maybe we need a new role name, at this, we still use Learner to represent the new role. (We should think over new role name)
> In our application scene, we will replicate data to kafka, and I will explain the method.
> h2. Learner replication
>  # Add a new replica role, maybe we call it learner, because Paxos has a learner role, which only receive data. We need such a role for new membership. But at Kudu Learner has been used for the copying(recovering) tablet replica. Maybe we need a new role name, at this, we still use Learner to represent the new role. (We should think over new role name)
>  # The voters's safepoint of clean obsoleted wal is min(leader’ max wal sequence number, followers max wal sequence number, learner’ max wal sequence number)
>  # The learner not voter, not partitipant in elections
>  # Raft can replication data to the learner
>  # The process of learner applydb, just like raft followers, the logs before committed index will replicate to kafka, kafka’s response ok. the apply index will increase.
>  # We need kafka client, it will be added to kudu as an option, maybe as an compile option
>  # When a kudu-tserver decomission or corrupted, the learner must move to new kudu-tserver. So the leader should save learner apply OpId, and replicate to followers, when learner's failover when leader down.
>  # The leader must save the learners apply OpId and replicate it to followers, when learner's recovery can make sure no data loss when leader down. If leader no save the applyIndex, learner maybe loss data
>  # Followers save the learners applyindex and term, coz followers maybe become leader.
>  # When load balancer running,we shoud support move learner another kudu-tserver
>  # Table should add a switch option to determine whether raft group has learner, can support setting it when creating table.
>  # Support altering table to add learners maybe an idea, but need solve the base data migrate problem.
>  # Base data migrate. The simple but heavy cost, when learner's max_OpId < committed_OpId (maybe data loss, maybe we alter table add learner replication for a existing table), we can trigger a full scan at the timestamp and replicate data to learner, and then recover the appendEntries flow.
>  # Kudu not support split and merge, we not discuss it now. If KuduSupport split or merge, we can implement it use 12, of course we can use more better method.
>  # If we need the funtion, our cluster should at least 4 tservers.
> If kafka fail or topic not exist, the learner will stop replicate wal, that will occupt more disk space. if learner loss or corrupted, it can recover from the leader. We need make sure the safepoint.
> h2. Leader replication
> We can replication data to kafka or any other storage system from leader directly.
>  # We need not set a role, but the dest is kafka, PeerManager's one peer is different from the others, that will make something complex.
>  # Reuse the leader’s wal, save a output network bandwidth compare to above method.
>  # All replica should maintenance a point to save the apply OpId at kafka.
>  # Safepoint of clean obsoleted wal is min( voters’ max wal sequence number, applyIndex at kafka), which leader save it.
>  # Any leader transfer must recover the apply OpId at kafka.
>  # We need kafka client, it will be added to kudu as an option, may be as an compile option
>  # If kafka's topic or kafka failure, print errorlog. and wal holds
>  # Some process the same as Learner replication. such as base data replicate.
>  # If we need the funtion, our cluster should at least 3 tservers.
> If kafka fail or topic not exist, the leader will stop replicate wal to kafka, that will occupt more disk space. We need make sure the safepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)