You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@kudu.apache.org by Franco Venturi <fv...@comcast.net> on 2017/09/22 02:12:01 UTC

Change Data Capture (CDC) with Kudu


We are planning for a 50-100TB Kudu installation (about 200 tables or so). 

One of the requirements that we are working on is to have a secondary copy of our data in a Disaster Recovery data center in a different location. 




Since we are going to have inserts, updates, and deletes (for instance in the case the primary key is changed), we are trying to devise a process that will keep the secondary instance in sync with the primary one. The two instances do not have to be identical in real-time (i.e. we are not looking for synchronous writes to Kudu), but we would like to have some pretty good confidence that the secondary instance contains all the changes that the primary has up to say an hour before (or something like that). 




So far we considered a couple of options: 
- refreshing the seconday instance with a full copy of the primary one every so often, but that would mean having to transfer say 50TB of data between the two locations every time, and our network bandwidth constraints would prevent to do that even on a daily basis 
- having a column that contains the most recent time a row was updated, however this column couldn't be part of the primary key (because the primary key in Kudu is immutable), and therefore finding which rows have been changed every time would require a full scan of the table to be sync'd. It would also rely on the "last update timestamp" column to be always updated by the application (an assumption that we would like to avoid), and would need some other process to take into accounts the rows that are deleted. 




Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of 'Change Data Capture' mechanism where only the 'deltas' are captured and applied to the secondary instance, we were wondering if there's any way in Kudu to achieve something like that (possibly mining the WALs, since my understanding is that each change gets applied to the WALs first). 




Thanks, 
Franco Venturi 

Re: Change Data Capture (CDC) with Kudu

Posted by Todd Lipcon <to...@cloudera.com>.
Hey all,

Franco and I ran into each other at Strata this week and chatted about this
a bit in person, but wanted to reply to the email as well to share with the
larger audience.

Regarding the idea to intercept the RPCs, I think it would only work in a
very simple (single-client) workload. For example, consider the case where
you have two clients performing operations at the same instant in time:

C1: INSERT INTO t (my_pk, my_val) VALUES ("hello", "world")
C2: INSERT INTO t (my_pk, my_val) VALUES ("hello", "universe")

In this case, the two RPCs may arrive at the process on two different
reactor threads reading from the network. Even if the network packets from
C1 arrived before C2, the reactor thread associated with C1 may be delayed
by a few milliseconds for a number of reasons and therefore enqueue the RPC
onto the worker pool after the RPC from C2. Even if they are enqueued to
the RPC workers in the order (C1, C2), the operations then travel to
another pool (the "prepare pool") before they actually acquire locks. Only
once the operations have acquired locks are they assigned the timestamps
which represent their finalized order.

Without inspecting the resulting WAL and associated COMMIT messages, it's
impossible to know from the RPC stream which order the two operations were
performed in, and therefore would not be possible to replicate the
resulting table consistently to another cluster.

So, I think the RPC-interception approach, while clever, is not really
feasible for most real-world workloads in which there are multiple clients
writing and the order of operations is important.

As for the other options, I think there are a few worth considering, but
all would require some relatively complex changes to the core code, as I
think Mike mentioned earlier in this thread.

The first option would be something like a WAL tailer -- either by adding a
new RPC to request entries from a running server's WAL, or by adding the
replication sink as a "follower" replica. Either would result in a stream
of WAL entries along with enough information to agree on the serialized
order of operations (the same mechanism by which normal in-cluster replicas
work). Currently we don't have any public APIs for reading WALs, nor the
ability to add follower-only "sink" replicas. Additionally there might be
some complexity here if the backup cluster aims to have a different
partitioning scheme than the original cluster, etc.

I think long term this is probably the most promising option, but also a
pretty complex implementation. In addition to the above WAL-tailing and
WAL-applying code, we'd need a bunch of tooling around setting up replicas,
monitoring replication, initiating a "failover", etc. I think it's probably
north of a person-year of effort to get something truly usable, but even
that is a wild guess.


The second option would be something more like an incremental copy job.
Currently we don't expose any method to scan deltas or to retrieve anything
like a "last update" column for a row, but we internally store enough
information to reconstruct these things, so long as the incremental copy
runs within the configured garbage-collection time window. One could
imagine a scanner which can be configured to scan a "diff" given a
timestamp round, which would return for each row an enum indicating whether
it was inserted, updated, or deleted, along with the updated columns and
timestamp.

Again this would be quite a bit of code, not a trivial project, and the
result would have some downsides as well (eg it would be "incremental copy"
rather than a true "async replica" style DR). Again it's probably many
person-months of effort to get something working beyond a simple prototype
here, but likely a bit simpler than the above.


Anyway, design-by-mailing-list-thread is probably not the best avenue
forward. I think until we have some contributors with enough time to
allocate to working on these things, the various application-level
workarounds are the best approach (eg keeping a last_updated column and
marking deleted rows with a 'deleted_as_of' boolean, and then using 'UPSERT
INTO backup_cluster_table SELECT * FROM original_table WHERE updated_on >=
$last_update_time' on a periodic basis.


Thanks
Todd

On Sat, Sep 23, 2017 at 11:33 AM, Franco Venturi <fv...@comcast.net>
wrote:

> Adar and Mike,
> first of all thanks for the replies and interesting suggestions.
>
>
> Over the last couple of days I came up with a different approach that
> wouldn't require to mess with the Kudu WALs (or much of the internals).
>
>
> The basic idea would be to capture the changes (insert, deletes, schema
> changes, etc) at the input, i.e. when they enter Kudu, instead of when they
> get written to the actual storage (i.e. WALs, data files).
> Let me explain better: since all the interactions between the Kudu
> services and their clients is via a set of well defined RPCs (see here
> https://github.com/cloudera/kudu/blob/master/docs/design-docs/rpc.md), my
> idea would be to interpose a thin transparent proxy layer between the
> clients and the Kudu services that basically sends everything though
> unchanged, but also captures the "interesting" RPCs and writes them aside
> to be replayed on the secondary instance.
>
>
> This approach would have several advantages including capturing changes at
> a much higher 'logical' level (while mining the WALs would probably result
> in a more 'physical' view of the changes).
>
>
> A few other advantages I thought of would be:
> - the RPC layer should probably be a much more stable interface than the
> internals of the WALs (since the clients interact with it and hence
> changing the RPC interfaces would cause a lot more problems than changing
> some of the Kudu internals)
> - the RPC layer has already a well defined serialization layout via
> Protobuf messages, and therefore it would very simple to just 'dump' to
> disk the full content of interesting RPC calls
> - the transparent RPC proxy could be written as a standalone independent
> process that listens to the Kudu ports (7050, 7051) and sends everything
> back to the 'real' Kudu ports (which would have to be changed to say 17050
> and 17051)
> - being a totally independent process it would not require any changes to
> the Kudu code base (or even the Kudu binary installation in case of a
> distribution), and therefore could be transparently "added" to an existing
> Kudu installation (if you need vendor support for a Kudu problem, I imagine
> you would have to first make sure you can replicate the issue without this
> proxy)
> - as a standalone application, it could also be marketed as a premium
> 'add-on' feature by a vendor, since most of the customers asking for this
> kind of replication are at the enterprise level, who would defintely see
> the added value offered by this tool
> - since the list of changes is sequential in nature (it could be ordered
> by 'call_id' value), it could be appended to some sort of "transaction" log
> file (which could be stored in HDFS, since these would be sequential
> writes); the file would be periodically sync'd up to the secondary instance
> (for instance via 'rsync' or by taking advantage of BDR if the file is
> stored in HDFS)
> - on the secondary site another process (acting as a Kudu client) would
> replay the RPC calls to the secondary Kudu instance following the 'call_id'
> sequence; given the strict integration between the 'replayer' process and
> the 'transparent RPC proxy' on the primary site, they could both be part of
> the same application binary (which could possibly be used in a
> bidirectional way for an active-active scenario, where an organization may
> want to have some sort of multi-master replication)
> - the secondary 'replayer' process too wouldn't require any changes to the
> existing Kudu installation as it would act just as an external RPC client
> from the Kudu point of view
>
>
> Of course the devil is in the details, and a few issues I can see are:
> - how to deal with errors on either side (for instance disk full on just
> one of the two instances); the number of error combinations could be
> potentially quite large
> - keeping track of the tablet id's on both sides (I imagine they would be
> different, so the 'replayer' process would have to 'map' the original
> tablet id's - and probably several other values - to the matching values on
> the secondary instance)
> - integration with Kerberos and security issues in general
>
>
> This morning I spent a few minutes looking at the list of Kudu RPCs (I
> pulled the source code from git and ran this command: find . -name \*.proto
> | grep -v 'rtest.proto' | xargs cat | grep '^ rpc ' | sort) and this is an
> initial list of the RPC methods that I think would have to captured:
> - AlterSchema
> - AlterTable
> - CreateTable
> - CreateTablet
> - DeleteTable
> - DeleteTablet
> - Write
>
>
> I think the first step would be to just write a fully transparent RPC
> proxy for Kudu that just logs each RPC method through it, to see how it
> goes.
>
>
> Franco
>
> ------------------------------
> *From: *"Mike Percy" <mp...@apache.org>
> *To: *user@kudu.apache.org
> *Sent: *Friday, September 22, 2017 5:32:41 PM
> *Subject: *Re: Change Data Capture (CDC) with Kudu
>
>
> Franco,
> I just realized that I suggested something you mentioned in your initial
> email. My mistake for not reading through to the end. It is probably the
> least-worst approach right now and it's probably what I would do if I were
> you.
>
> Mike
>
> On Fri, Sep 22, 2017 at 2:29 PM, Mike Percy <mp...@apache.org> wrote:
>
>> CDC is something that I would like to see in Kudu but we aren't there yet
>> with the underlying support in the Raft Consensus implementation. Once we
>> have higher availability re-replication support (KUDU-1097) we will be a
>> bit closer for a solution involving traditional WAL streaming to an
>> external consumer because we will have support for non-voting replicas. But
>> there would still be plenty of work to do to support CDC after that, at
>> least from an API perspective as well as a WAL management perspective (how
>> long to keep old log files).
>>
>> That said, what you really are asking for is a streaming backup solution,
>> which may or may not use the same mechanism (unfortunately it's not
>> designed or implemented yet).
>>
>> As an alternative to Adar's suggestions, a reasonable option for you at
>> this time may be an incremental backup. It takes a little schema design to
>> do it, though. You could consider doing something like the following:
>>
>>    1. Add a last_updated column to all your tables and update the column
>>    when you change the value. Ideally monotonic across the cluster but you
>>    could also go with local time and build in a "fudge factor" when reading in
>>    step 2
>>    2. Periodically scan the table for any changes newer than the
>>    previous scan in the last_updated column. This type of scan is more
>>    efficient to do in Kudu than in many other systems. With Impala you could
>>    run a query like: select * from table1 where last_updated > $prev_updated;
>>    3. Dump the results of this query to parquet
>>    4. Use distcp to copy the parquet files over to the other cluster
>>    periodically (maybe you can throttle this if needed to avoid saturating the
>>    pipe)
>>    5. Upsert the parquet data into Kudu on the remote end
>>
>> Hopefully some workaround like this would work for you until Kudu has a
>> reliable streaming backup solution.
>>
>> Like Adar said, as an Apache project we are always open to contributions
>> and it would be great to get some in this area. Please reach out if you're
>> interested in collaborating on a design.
>>
>> Mike
>>
>> On Fri, Sep 22, 2017 at 10:43 AM, Adar Lieber-Dembo <ad...@cloudera.com>
>> wrote:
>>
>>> Franco,
>>>
>>> Thanks for the detailed description of your problem.
>>>
>>> I'm afraid there's no such mechanism in Kudu today. Mining the WALs
>>> seems like a path fraught with land mines. Kudu GCs WAL segments
>>> aggressively so I'd be worried about a listening mechanism missing out on
>>> some row operations. Plus the WAL is Raft-specific as it includes both
>>> REPLICATE messages (reflecting a Write RPC from a client) and COMMIT
>>> messages (written out when a majority of replicas have written a
>>> REPLICATE); parsing and making sense of this would be challenging. Perhaps
>>> you could build something using Linux's inotify system for receiving file
>>> change notifications, but again I'd be worried about missing certain
>>> updates.
>>>
>>> Another option is to replicate the data at the OS level. For example,
>>> you could periodically rsync the entire cluster onto a standby cluster.
>>> There's bound to be data loss in the event of a failover, but I don't think
>>> you'll run into any corruption (though Kudu does take advantage of sparse
>>> files and hole punching, so you should verify that any tool you use
>>> supports that).
>>>
>>> Disaster Recovery is an oft-requested feature, but one that Kudu
>>> developers have been unable to prioritize yet. Would you or your someone on
>>> your team be interested in working on this?
>>>
>>> On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi <fv...@comcast.net>
>>> wrote:
>>>
>>>> We are planning for a 50-100TB Kudu installation (about 200 tables or
>>>> so).
>>>>
>>>> One of the requirements that we are working on is to have a secondary
>>>> copy of our data in a Disaster Recovery data center in a different location.
>>>>
>>>>
>>>> Since we are going to have inserts, updates, and deletes (for instance
>>>> in the case the primary key is changed), we are trying to devise a process
>>>> that will keep the secondary instance in sync with the primary one. The two
>>>> instances do not have to be identical in real-time (i.e. we are not looking
>>>> for synchronous writes to Kudu), but we would like to have some pretty good
>>>> confidence that the secondary instance contains all the changes that the
>>>> primary has up to say an hour before (or something like that).
>>>>
>>>>
>>>> So far we considered a couple of options:
>>>> - refreshing the seconday instance with a full copy of the primary one
>>>> every so often, but that would mean having to transfer say 50TB of data
>>>> between the two locations every time, and our network bandwidth constraints
>>>> would prevent to do that even on a daily basis
>>>> - having a column that contains the most recent time a row was updated,
>>>> however this column couldn't be part of the primary key (because the
>>>> primary key in Kudu is immutable), and therefore finding which rows have
>>>> been changed every time would require a full scan of the table to be
>>>> sync'd. It would also rely on the "last update timestamp" column to be
>>>> always updated by the application (an assumption that we would like to
>>>> avoid), and would need some other process to take into accounts the rows
>>>> that are deleted.
>>>>
>>>>
>>>> Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of
>>>> 'Change Data Capture' mechanism where only the 'deltas' are captured and
>>>> applied to the secondary instance, we were wondering if there's any way in
>>>> Kudu to achieve something like that (possibly mining the WALs, since my
>>>> understanding is that each change gets applied to the WALs first).
>>>>
>>>>
>>>> Thanks,
>>>> Franco Venturi
>>>>
>>>
>>
>
>


-- 
Todd Lipcon
Software Engineer, Cloudera

Re: Change Data Capture (CDC) with Kudu

Posted by Franco Venturi <fv...@comcast.net>.

Adar and Mike, 
first of all thanks for the replies and interesting suggestions. 




Over the last couple of days I came up with a different approach that wouldn't require to mess with the Kudu WALs (or much of the internals). 




The basic idea would be to capture the changes (insert, deletes, schema changes, etc) at the input, i.e. when they enter Kudu, instead of when they get written to the actual storage (i.e. WALs, data files). 
Let me explain better: since all the interactions between the Kudu services and their clients is via a set of well defined RPCs (see here https://github.com/cloudera/kudu/blob/master/docs/design-docs/rpc.md), my idea would be to interpose a thin transparent proxy layer between the clients and the Kudu services that basically sends everything though unchanged, but also captures the "interesting" RPCs and writes them aside to be replayed on the secondary instance. 




This approach would have several advantages including capturing changes at a much higher 'logical' level (while mining the WALs would probably result in a more 'physical' view of the changes). 




A few other advantages I thought of would be: 
- the RPC layer should probably be a much more stable interface than the internals of the WALs (since the clients interact with it and hence changing the RPC interfaces would cause a lot more problems than changing some of the Kudu internals) 
- the RPC layer has already a well defined serialization layout via Protobuf messages, and therefore it would very simple to just 'dump' to disk the full content of interesting RPC calls 
- the transparent RPC proxy could be written as a standalone independent process that listens to the Kudu ports (7050, 7051) and sends everything back to the 'real' Kudu ports (which would have to be changed to say 17050 and 17051) 
- being a totally independent process it would not require any changes to the Kudu code base (or even the Kudu binary installation in case of a distribution), and therefore could be transparently "added" to an existing Kudu installation (if you need vendor support for a Kudu problem, I imagine you would have to first make sure you can replicate the issue without this proxy) 
- as a standalone application, it could also be marketed as a premium 'add-on' feature by a vendor, since most of the customers asking for this kind of replication are at the enterprise level, who would defintely see the added value offered by this tool 
- since the list of changes is sequential in nature (it could be ordered by 'call_id' value), it could be appended to some sort of "transaction" log file (which could be stored in HDFS, since these would be sequential writes); the file would be periodically sync'd up to the secondary instance (for instance via 'rsync' or by taking advantage of BDR if the file is stored in HDFS) 
- on the secondary site another process (acting as a Kudu client) would replay the RPC calls to the secondary Kudu instance following the 'call_id' sequence; given the strict integration between the 'replayer' process and the 'transparent RPC proxy' on the primary site, they could both be part of the same application binary (which could possibly be used in a bidirectional way for an active-active scenario, where an organization may want to have some sort of multi-master replication) 
- the secondary 'replayer' process too wouldn't require any changes to the existing Kudu installation as it would act just as an external RPC client from the Kudu point of view 


Of course the devil is in the details, and a few issues I can see are: 
- how to deal with errors on either side (for instance disk full on just one of the two instances); the number of error combinations could be potentially quite large 
- keeping track of the tablet id's on both sides (I imagine they would be different, so the 'replayer' process would have to 'map' the original tablet id's - and probably several other values - to the matching values on the secondary instance) 
- integration with Kerberos and security issues in general 


This morning I spent a few minutes looking at the list of Kudu RPCs (I pulled the source code from git and ran this command: find . -name \*.proto | grep -v 'rtest.proto' | xargs cat | grep '^ rpc ' | sort) and this is an initial list of the RPC methods that I think would have to captured: 
- AlterSchema 
- AlterTable 
- CreateTable 
- CreateTablet 
- DeleteTable 
- DeleteTablet 
- Write 


I think the first step would be to just write a fully transparent RPC proxy for Kudu that just logs each RPC method through it, to see how it goes. 




Franco 

----- Original Message -----

From: "Mike Percy" <mp...@apache.org> 
To: user@kudu.apache.org 
Sent: Friday, September 22, 2017 5:32:41 PM 
Subject: Re: Change Data Capture (CDC) with Kudu 

Franco, 
I just realized that I suggested something you mentioned in your initial email. My mistake for not reading through to the end. It is probably the least-worst approach right now and it's probably what I would do if I were you. 

Mike 

On Fri, Sep 22, 2017 at 2:29 PM, Mike Percy < mpercy@apache.org > wrote: 



CDC is something that I would like to see in Kudu but we aren't there yet with the underlying support in the Raft Consensus implementation. Once we have higher availability re-replication support (KUDU-1097) we will be a bit closer for a solution involving traditional WAL streaming to an external consumer because we will have support for non-voting replicas. But there would still be plenty of work to do to support CDC after that, at least from an API perspective as well as a WAL management perspective (how long to keep old log files). 

That said, what you really are asking for is a streaming backup solution, which may or may not use the same mechanism (unfortunately it's not designed or implemented yet). 

As an alternative to Adar's suggestions, a reasonable option for you at this time may be an incremental backup. It takes a little schema design to do it, though. You could consider doing something like the following: 


    1. Add a last_updated column to all your tables and update the column when you change the value. Ideally monotonic across the cluster but you could also go with local time and build in a "fudge factor" when reading in step 2 
    2. Periodically scan the table for any changes newer than the previous scan in the last_updated column. This type of scan is more efficient to do in Kudu than in many other systems. With Impala you could run a query like: select * from table1 where last_updated > $prev_updated; 
    3. Dump the results of this query to parquet 
    4. Use distcp to copy the parquet files over to the other cluster periodically (maybe you can throttle this if needed to avoid saturating the pipe) 
    5. Upsert the parquet data into Kudu on the remote end 

Hopefully some workaround like this would work for you until Kudu has a reliable streaming backup solution. 

Like Adar said, as an Apache project we are always open to contributions and it would be great to get some in this area. Please reach out if you're interested in collaborating on a design. 

Mike 

On Fri, Sep 22, 2017 at 10:43 AM, Adar Lieber-Dembo < adar@cloudera.com > wrote: 

<blockquote>

Franco, 

Thanks for the detailed description of your problem. 

I'm afraid there's no such mechanism in Kudu today. Mining the WALs seems like a path fraught with land mines. Kudu GCs WAL segments aggressively so I'd be worried about a listening mechanism missing out on some row operations. Plus the WAL is Raft-specific as it includes both REPLICATE messages (reflecting a Write RPC from a client) and COMMIT messages (written out when a majority of replicas have written a REPLICATE); parsing and making sense of this would be challenging. Perhaps you could build something using Linux's inotify system for receiving file change notifications, but again I'd be worried about missing certain updates. 

Another option is to replicate the data at the OS level. For example, you could periodically rsync the entire cluster onto a standby cluster. There's bound to be data loss in the event of a failover, but I don't think you'll run into any corruption (though Kudu does take advantage of sparse files and hole punching, so you should verify that any tool you use supports that). 

Disaster Recovery is an oft-requested feature, but one that Kudu developers have been unable to prioritize yet. Would you or your someone on your team be interested in working on this? 

On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi < fventuri@comcast.net > wrote: 

<blockquote>



We are planning for a 50-100TB Kudu installation (about 200 tables or so). 

One of the requirements that we are working on is to have a secondary copy of our data in a Disaster Recovery data center in a different location. 




Since we are going to have inserts, updates, and deletes (for instance in the case the primary key is changed), we are trying to devise a process that will keep the secondary instance in sync with the primary one. The two instances do not have to be identical in real-time (i.e. we are not looking for synchronous writes to Kudu), but we would like to have some pretty good confidence that the secondary instance contains all the changes that the primary has up to say an hour before (or something like that). 




So far we considered a couple of options: 
- refreshing the seconday instance with a full copy of the primary one every so often, but that would mean having to transfer say 50TB of data between the two locations every time, and our network bandwidth constraints would prevent to do that even on a daily basis 
- having a column that contains the most recent time a row was updated, however this column couldn't be part of the primary key (because the primary key in Kudu is immutable), and therefore finding which rows have been changed every time would require a full scan of the table to be sync'd. It would also rely on the "last update timestamp" column to be always updated by the application (an assumption that we would like to avoid), and would need some other process to take into accounts the rows that are deleted. 




Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of 'Change Data Capture' mechanism where only the 'deltas' are captured and applied to the secondary instance, we were wondering if there's any way in Kudu to achieve something like that (possibly mining the WALs, since my understanding is that each change gets applied to the WALs first). 




Thanks, 
Franco Venturi 




</blockquote>



</blockquote>




Re: Change Data Capture (CDC) with Kudu

Posted by Mike Percy <mp...@apache.org>.
Franco,
I just realized that I suggested something you mentioned in your initial
email. My mistake for not reading through to the end. It is probably the
least-worst approach right now and it's probably what I would do if I were
you.

Mike

On Fri, Sep 22, 2017 at 2:29 PM, Mike Percy <mp...@apache.org> wrote:

> CDC is something that I would like to see in Kudu but we aren't there yet
> with the underlying support in the Raft Consensus implementation. Once we
> have higher availability re-replication support (KUDU-1097) we will be a
> bit closer for a solution involving traditional WAL streaming to an
> external consumer because we will have support for non-voting replicas. But
> there would still be plenty of work to do to support CDC after that, at
> least from an API perspective as well as a WAL management perspective (how
> long to keep old log files).
>
> That said, what you really are asking for is a streaming backup solution,
> which may or may not use the same mechanism (unfortunately it's not
> designed or implemented yet).
>
> As an alternative to Adar's suggestions, a reasonable option for you at
> this time may be an incremental backup. It takes a little schema design to
> do it, though. You could consider doing something like the following:
>
>    1. Add a last_updated column to all your tables and update the column
>    when you change the value. Ideally monotonic across the cluster but you
>    could also go with local time and build in a "fudge factor" when reading in
>    step 2
>    2. Periodically scan the table for any changes newer than the previous
>    scan in the last_updated column. This type of scan is more efficient to do
>    in Kudu than in many other systems. With Impala you could run a query like:
>    select * from table1 where last_updated > $prev_updated;
>    3. Dump the results of this query to parquet
>    4. Use distcp to copy the parquet files over to the other cluster
>    periodically (maybe you can throttle this if needed to avoid saturating the
>    pipe)
>    5. Upsert the parquet data into Kudu on the remote end
>
> Hopefully some workaround like this would work for you until Kudu has a
> reliable streaming backup solution.
>
> Like Adar said, as an Apache project we are always open to contributions
> and it would be great to get some in this area. Please reach out if you're
> interested in collaborating on a design.
>
> Mike
>
> On Fri, Sep 22, 2017 at 10:43 AM, Adar Lieber-Dembo <ad...@cloudera.com>
> wrote:
>
>> Franco,
>>
>> Thanks for the detailed description of your problem.
>>
>> I'm afraid there's no such mechanism in Kudu today. Mining the WALs seems
>> like a path fraught with land mines. Kudu GCs WAL segments aggressively so
>> I'd be worried about a listening mechanism missing out on some row
>> operations. Plus the WAL is Raft-specific as it includes both REPLICATE
>> messages (reflecting a Write RPC from a client) and COMMIT messages
>> (written out when a majority of replicas have written a REPLICATE); parsing
>> and making sense of this would be challenging. Perhaps you could build
>> something using Linux's inotify system for receiving file change
>> notifications, but again I'd be worried about missing certain updates.
>>
>> Another option is to replicate the data at the OS level. For example, you
>> could periodically rsync the entire cluster onto a standby cluster. There's
>> bound to be data loss in the event of a failover, but I don't think you'll
>> run into any corruption (though Kudu does take advantage of sparse files
>> and hole punching, so you should verify that any tool you use supports
>> that).
>>
>> Disaster Recovery is an oft-requested feature, but one that Kudu
>> developers have been unable to prioritize yet. Would you or your someone on
>> your team be interested in working on this?
>>
>> On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi <fv...@comcast.net>
>> wrote:
>>
>>> We are planning for a 50-100TB Kudu installation (about 200 tables or
>>> so).
>>>
>>> One of the requirements that we are working on is to have a secondary
>>> copy of our data in a Disaster Recovery data center in a different location.
>>>
>>>
>>> Since we are going to have inserts, updates, and deletes (for instance
>>> in the case the primary key is changed), we are trying to devise a process
>>> that will keep the secondary instance in sync with the primary one. The two
>>> instances do not have to be identical in real-time (i.e. we are not looking
>>> for synchronous writes to Kudu), but we would like to have some pretty good
>>> confidence that the secondary instance contains all the changes that the
>>> primary has up to say an hour before (or something like that).
>>>
>>>
>>> So far we considered a couple of options:
>>> - refreshing the seconday instance with a full copy of the primary one
>>> every so often, but that would mean having to transfer say 50TB of data
>>> between the two locations every time, and our network bandwidth constraints
>>> would prevent to do that even on a daily basis
>>> - having a column that contains the most recent time a row was updated,
>>> however this column couldn't be part of the primary key (because the
>>> primary key in Kudu is immutable), and therefore finding which rows have
>>> been changed every time would require a full scan of the table to be
>>> sync'd. It would also rely on the "last update timestamp" column to be
>>> always updated by the application (an assumption that we would like to
>>> avoid), and would need some other process to take into accounts the rows
>>> that are deleted.
>>>
>>>
>>> Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of
>>> 'Change Data Capture' mechanism where only the 'deltas' are captured and
>>> applied to the secondary instance, we were wondering if there's any way in
>>> Kudu to achieve something like that (possibly mining the WALs, since my
>>> understanding is that each change gets applied to the WALs first).
>>>
>>>
>>> Thanks,
>>> Franco Venturi
>>>
>>
>

Re: Change Data Capture (CDC) with Kudu

Posted by Mike Percy <mp...@apache.org>.
CDC is something that I would like to see in Kudu but we aren't there yet
with the underlying support in the Raft Consensus implementation. Once we
have higher availability re-replication support (KUDU-1097) we will be a
bit closer for a solution involving traditional WAL streaming to an
external consumer because we will have support for non-voting replicas. But
there would still be plenty of work to do to support CDC after that, at
least from an API perspective as well as a WAL management perspective (how
long to keep old log files).

That said, what you really are asking for is a streaming backup solution,
which may or may not use the same mechanism (unfortunately it's not
designed or implemented yet).

As an alternative to Adar's suggestions, a reasonable option for you at
this time may be an incremental backup. It takes a little schema design to
do it, though. You could consider doing something like the following:

   1. Add a last_updated column to all your tables and update the column
   when you change the value. Ideally monotonic across the cluster but you
   could also go with local time and build in a "fudge factor" when reading in
   step 2
   2. Periodically scan the table for any changes newer than the previous
   scan in the last_updated column. This type of scan is more efficient to do
   in Kudu than in many other systems. With Impala you could run a query like:
   select * from table1 where last_updated > $prev_updated;
   3. Dump the results of this query to parquet
   4. Use distcp to copy the parquet files over to the other cluster
   periodically (maybe you can throttle this if needed to avoid saturating the
   pipe)
   5. Upsert the parquet data into Kudu on the remote end

Hopefully some workaround like this would work for you until Kudu has a
reliable streaming backup solution.

Like Adar said, as an Apache project we are always open to contributions
and it would be great to get some in this area. Please reach out if you're
interested in collaborating on a design.

Mike

On Fri, Sep 22, 2017 at 10:43 AM, Adar Lieber-Dembo <ad...@cloudera.com>
wrote:

> Franco,
>
> Thanks for the detailed description of your problem.
>
> I'm afraid there's no such mechanism in Kudu today. Mining the WALs seems
> like a path fraught with land mines. Kudu GCs WAL segments aggressively so
> I'd be worried about a listening mechanism missing out on some row
> operations. Plus the WAL is Raft-specific as it includes both REPLICATE
> messages (reflecting a Write RPC from a client) and COMMIT messages
> (written out when a majority of replicas have written a REPLICATE); parsing
> and making sense of this would be challenging. Perhaps you could build
> something using Linux's inotify system for receiving file change
> notifications, but again I'd be worried about missing certain updates.
>
> Another option is to replicate the data at the OS level. For example, you
> could periodically rsync the entire cluster onto a standby cluster. There's
> bound to be data loss in the event of a failover, but I don't think you'll
> run into any corruption (though Kudu does take advantage of sparse files
> and hole punching, so you should verify that any tool you use supports
> that).
>
> Disaster Recovery is an oft-requested feature, but one that Kudu
> developers have been unable to prioritize yet. Would you or your someone on
> your team be interested in working on this?
>
> On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi <fv...@comcast.net>
> wrote:
>
>> We are planning for a 50-100TB Kudu installation (about 200 tables or so).
>>
>> One of the requirements that we are working on is to have a secondary
>> copy of our data in a Disaster Recovery data center in a different location.
>>
>>
>> Since we are going to have inserts, updates, and deletes (for instance in
>> the case the primary key is changed), we are trying to devise a process
>> that will keep the secondary instance in sync with the primary one. The two
>> instances do not have to be identical in real-time (i.e. we are not looking
>> for synchronous writes to Kudu), but we would like to have some pretty good
>> confidence that the secondary instance contains all the changes that the
>> primary has up to say an hour before (or something like that).
>>
>>
>> So far we considered a couple of options:
>> - refreshing the seconday instance with a full copy of the primary one
>> every so often, but that would mean having to transfer say 50TB of data
>> between the two locations every time, and our network bandwidth constraints
>> would prevent to do that even on a daily basis
>> - having a column that contains the most recent time a row was updated,
>> however this column couldn't be part of the primary key (because the
>> primary key in Kudu is immutable), and therefore finding which rows have
>> been changed every time would require a full scan of the table to be
>> sync'd. It would also rely on the "last update timestamp" column to be
>> always updated by the application (an assumption that we would like to
>> avoid), and would need some other process to take into accounts the rows
>> that are deleted.
>>
>>
>> Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of
>> 'Change Data Capture' mechanism where only the 'deltas' are captured and
>> applied to the secondary instance, we were wondering if there's any way in
>> Kudu to achieve something like that (possibly mining the WALs, since my
>> understanding is that each change gets applied to the WALs first).
>>
>>
>> Thanks,
>> Franco Venturi
>>
>

Re: Change Data Capture (CDC) with Kudu

Posted by Adar Lieber-Dembo <ad...@cloudera.com>.
Franco,

Thanks for the detailed description of your problem.

I'm afraid there's no such mechanism in Kudu today. Mining the WALs seems
like a path fraught with land mines. Kudu GCs WAL segments aggressively so
I'd be worried about a listening mechanism missing out on some row
operations. Plus the WAL is Raft-specific as it includes both REPLICATE
messages (reflecting a Write RPC from a client) and COMMIT messages
(written out when a majority of replicas have written a REPLICATE); parsing
and making sense of this would be challenging. Perhaps you could build
something using Linux's inotify system for receiving file change
notifications, but again I'd be worried about missing certain updates.

Another option is to replicate the data at the OS level. For example, you
could periodically rsync the entire cluster onto a standby cluster. There's
bound to be data loss in the event of a failover, but I don't think you'll
run into any corruption (though Kudu does take advantage of sparse files
and hole punching, so you should verify that any tool you use supports
that).

Disaster Recovery is an oft-requested feature, but one that Kudu developers
have been unable to prioritize yet. Would you or your someone on your team
be interested in working on this?

On Thu, Sep 21, 2017 at 7:12 PM Franco Venturi <fv...@comcast.net> wrote:

> We are planning for a 50-100TB Kudu installation (about 200 tables or so).
>
> One of the requirements that we are working on is to have a secondary copy
> of our data in a Disaster Recovery data center in a different location.
>
>
> Since we are going to have inserts, updates, and deletes (for instance in
> the case the primary key is changed), we are trying to devise a process
> that will keep the secondary instance in sync with the primary one. The two
> instances do not have to be identical in real-time (i.e. we are not looking
> for synchronous writes to Kudu), but we would like to have some pretty good
> confidence that the secondary instance contains all the changes that the
> primary has up to say an hour before (or something like that).
>
>
> So far we considered a couple of options:
> - refreshing the seconday instance with a full copy of the primary one
> every so often, but that would mean having to transfer say 50TB of data
> between the two locations every time, and our network bandwidth constraints
> would prevent to do that even on a daily basis
> - having a column that contains the most recent time a row was updated,
> however this column couldn't be part of the primary key (because the
> primary key in Kudu is immutable), and therefore finding which rows have
> been changed every time would require a full scan of the table to be
> sync'd. It would also rely on the "last update timestamp" column to be
> always updated by the application (an assumption that we would like to
> avoid), and would need some other process to take into accounts the rows
> that are deleted.
>
>
> Since many of today's RDBMS (Oracle, MySQL, etc) allow for some sort of
> 'Change Data Capture' mechanism where only the 'deltas' are captured and
> applied to the secondary instance, we were wondering if there's any way in
> Kudu to achieve something like that (possibly mining the WALs, since my
> understanding is that each change gets applied to the WALs first).
>
>
> Thanks,
> Franco Venturi
>