You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Randall Hauch <rh...@gmail.com> on 2016/01/28 07:11:06 UTC

Accumulating data in Kafka Connect source tasks

I’m creating a custom Kafka Connect source connector, and I’m running into a situation for which Kafka Connect doesn’t seem to provide a solution out of the box. I thought I’d first post to the users list in case I’m just missing a feature that’s already there.

My connector’s SourceTask implementation is reading a relational database transaction log. That log contains schema changes and row changes, and the row changes include a reference to the table and the row values. Thus, as the task processes the log, it has to use any schema changes in the log to adjust how it converts subsequent row changes into Kafka source records. Should the task stop and be restarted elsewhere, it can continue reading the transaction log where it left off only if that new task instance can recover the schema state accumulated by an earlier task.

While I certainly can use a custom solution to store this state somewhere, it seems like other connectors might benefit from having Kafka Connect include something out of the box. And, this accumulated state (and its history with respect to the source offset at which the state changes) seems like a perfect fit for storing in a Kafka topic.

Does Kafka Connect already have a mechanism for tasks to store and recover arbitrary state? If not, then is there interest in adding this capability to Kafka Connect? (If there is interest, then perhaps the dev list is a better venue.)

Best regards,

Randall Hauch

Re: Accumulating data in Kafka Connect source tasks

Posted by James Cheng <jc...@tivo.com>.
> On Jan 28, 2016, at 5:06 PM, Ewen Cheslack-Postava <ew...@confluent.io> wrote:
>
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
>
> Finally, I'd love to know which DB you're reading the transaction log from
> and if you're planning on open sourcing the connector:)
>

+1! I'd like to know what DB you're working on too!

-James

> -Ewen
>
> On Thu, Jan 28, 2016 at 6:12 AM, Randall Hauch <rh...@gmail.com> wrote:
>
>> Rather than leave this thread so open ended, perhaps I can narrow down to
>> what I think is the best approach. These accumulations are really just
>> additional information from the source that don’t get written to the normal
>> topics. Instead, each change to the accumulated state can be emitted as
>> source records on a dedicated topic. That is very straightforward with the
>> existing Kafka Connect.
>>
>> The challenge I’m struggling with is how a task can/should, upon startup,
>> *consume* that stream to rebuild its state. I can set up my own Kafka
>> consumer for that topic, but IIUC now my connector config has to include
>> much of the same information included in the Kafka Connect workers
>> configuration.
>>
>> Am I just missing how a connector can see the worker configuration
>> properties? Or is there a way that Kafka Connect can help me create a Kafka
>> consumer?
>>
>> Best regards,
>>
>> Randall Hauch
>>
>> On January 28, 2016 at 12:11:07 AM, Randall Hauch (rhauch@gmail.com)
>> wrote:
>> I’m creating a custom Kafka Connect source connector, and I’m running into
>> a situation for which Kafka Connect doesn’t seem to provide a solution out
>> of the box. I thought I’d first post to the users list in case I’m just
>> missing a feature that’s already there.
>>
>> My connector’s SourceTask implementation is reading a relational database
>> transaction log. That log contains schema changes and row changes, and the
>> row changes include a reference to the table and the row values. Thus, as
>> the task processes the log, it has to use any schema changes in the log to
>> adjust how it converts subsequent row changes into Kafka source records.
>> Should the task stop and be restarted elsewhere, it can continue reading
>> the transaction log where it left off only if that new task instance can
>> recover the schema state accumulated by an earlier task.
>>
>> While I certainly can use a custom solution to store this state somewhere,
>> it seems like other connectors might benefit from having Kafka Connect
>> include something out of the box. And, this accumulated state (and its
>> history with respect to the source offset at which the state changes) seems
>> like a perfect fit for storing in a Kafka topic.
>>
>> Does Kafka Connect already have a mechanism for tasks to store and recover
>> arbitrary state? If not, then is there interest in adding this capability
>> to Kafka Connect? (If there is interest, then perhaps the dev list is a
>> better venue.)
>>
>> Best regards,
>>
>> Randall Hauch
>>
>
>
>
> --
> Thanks,
> Ewen


________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Accumulating data in Kafka Connect source tasks

Posted by James Cheng <jc...@tivo.com>.
> On Jan 29, 2016, at 7:06 AM, Randall Hauch <rh...@gmail.com> wrote:
>
> On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (ewen@confluent.io) wrote:
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
> I agree that ideally connectors would be stateless, or at least have no need for maintaining state across restarts. Unfortunately, that’s not always possible.
>
> Reading the log but using the current schema does pose a problem if/when the schema has evolved since the point in the log that we’re currently reading. This is far more of an issue if you’re playing catch up and there’s been non-compatible schema changes.
>
> Case in point: when MySQL inserts/updates/removes a row from a table, it writes an event in the log that includes (a) a table identifier and (b) the row values in column-order. There is no other information. Column renames might be okay, but adding or removing columns will likely result in mismatching the row values to the appropriate columns.
>
> Luckily, MySQL includes the DDL statements in the log, so my connector parses these as part of processing and builds up the schema state as it goes along. This works beautifully, with the only issue being how to persist and recover this after restarts.
>
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Hmm, it sound like hammering accumulated state into the offsets could be pretty problematic and potentially risky, especially if the state has very different size and frequency characteristics than the offsets.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
> Having a way to get the worker config would be awesome, and IMO it a nice minimalistic approach. If you think this is a good idea, I can log a JIRA and take it to the dev list. I’m willing to work on it, too.
>
> I’m starting to think that storing state on a separate dedicated topic is the best option, at least for me. First, connector tasks can easily record their state by simply adding more SourceRecord instances during polling. Second, that information might be useful for downstream consumers. And third, recording state this way requires no changes to the current Kafka Connect. I’d hate to add a feature to Kafka Connect that is not useful to others.
>
> Recovery would require consuming this topic upon restart. If the state were incremental, then a restart might require consuming the entire topic. If the state were snapshots, then simply reading the last entry might be sufficient. There’s also the option of doing primarily incremental with periodic snapshots.
>
> For my connector, I can easily store each DDL statement, making it incremental. Consuming the whole topic upon restart shouldn’t be that intensive. And if it does, then that only adds to the restart time a bit — no big deal.
>
> Finally, I'd love to know which DB you're reading the transaction log from
> and if you're planning on open sourcing the connector:)
>
> As I mentioned, it’s MySQL at the moment and part of a very (!) new OSS project called Debezium (http://debezium.io), and the MySQL connector is the first of hopefully many connectors for a variety of databases. (Contributors welcome!)
>

Randall, I am also working on a MySQL connector. https://github.com/wushujames/kafka-mysql-connector

It is a Kafka connector
which uses Maxwell (https://github.com/zendesk/maxwell) to do MySQL DDL parsing,
which uses open-replicator (https://github.com/zendesk/open-replicator),
of which https://github.com/shyiko/mysql-binlog-connector-java is a fork.

So it seems we're along the same path. Maxwell in particular is doing the things you are talking about in this thread. It captures MySQL schema changes and stores them in such a way that they are correlated with the binlog coordinates, and so they can be reloaded when the CDC process comes back up.

Let me know if you'd like to talk more about this. It'd be great to collaborate.

And if you're interested, here are a list of other MySQL CDC projects. https://github.com/wushujames/mysql-cdc-projects

-James

> Best regards,
>
> Randall


________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: Accumulating data in Kafka Connect source tasks

Posted by Randall Hauch <rh...@gmail.com>.
On January 29, 2016 at 11:59:28 AM, Ewen Cheslack-Postava (ewen@confluent.io) wrote:


On Fri, Jan 29, 2016 at 7:06 AM, Randall Hauch <rh...@gmail.com> wrote:
Luckily, MySQL includes the DDL statements in the log, so my connector parses these as part of processing and builds up the schema state as it goes along. This works beautifully, with the only issue being how to persist and recover this after restarts.

Yeah, this is a common complaint about the MySQL binlog. I know James mentioned this as well. It's a bit crazy that you need a full parser for the DDL to make this work :/
Yes, it’s a bit unfortunately, but entirely understandable. Log readers are usually other MySQL servers that can easily handle the DDL. 

Since several other DBMSes transaction logs were exposed at least initially to enable replication, I suspect needing to read DDL may be the case for several other DBMSes, too.

Having a way to get the worker config would be awesome, and IMO it a nice minimalistic approach. If you think this is a good idea, I can log a JIRA and take it to the dev list. I’m willing to work on it, too.  

 I think this is not going to be commonly used, but I think it'd be fine to expose it.
I’ll send a message on the dev list and, if I don’t hear otherwise, create a JIRA.

I’m starting to think that storing state on a separate dedicated topic is the best option, at least for me. First, connector tasks can easily record their state by simply adding more SourceRecord instances during polling. Second, that information might be useful for downstream consumers. And third, recording state this way requires no changes to the current Kafka Connect. I’d hate to add a feature to Kafka Connect that is not useful to others.

Recovery would require consuming this topic upon restart. If the state were incremental, then a restart might require consuming the entire topic. If the state were snapshots, then simply reading the last entry might be sufficient. There’s also the option of doing primarily incremental with periodic snapshots.

For my connector, I can easily store each DDL statement, making it incremental. Consuming the whole topic upon restart shouldn’t be that intensive. And if it does, then that only adds to the restart time a bit — no big deal.

This all sounds like it should work fine, just remember that you'll have to store enough info with the DDL statement to indicate when it should be applied. The case you want to be careful of is if your connector processes the change and crashes before offsets are committed. When the process comes back up, you need to make sure that if it recovers and starts at an earlier offset that it will still be able to get the previous schema. That's the drawback of not being able to tie writes of state in with offset commits -- you need to handle potential inconsistencies.
Ack. I can easily handle this by storing the source offsets of the DDL statement events. This way, when the task comes back it can recover the schema from whatever source position the task is supposed to start reading.

As I mentioned, it’s MySQL at the moment and part of a very (!) new OSS project called Debezium (http://debezium.io), and the MySQL connector is the first of hopefully many connectors for a variety of databases. (Contributors welcome!)

Looks cool! I see you have at least skeletons for a couple of different DBs. Will the Kafka Connect adapter work with all of them?
That’s the plan, though yes the others are mostly skeletons. MongoDB is next, and I hope to adopt Bottled Water for PostgreSQL (I saw that Martin wanted to use Kafka Connect).

The one thing I’m unsure of is the best way (other than strings) to put JSON-like values inside Structs, but that’s another topic.

Thanks for the help!

Randall

Re: Accumulating data in Kafka Connect source tasks

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
On Fri, Jan 29, 2016 at 7:06 AM, Randall Hauch <rh...@gmail.com> wrote:

> On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (
> ewen@confluent.io) wrote:
>
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
>
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
>
> I agree that ideally connectors would be stateless, or at least have no
> need for maintaining state across restarts. Unfortunately, that’s not
> always possible.
>
> Reading the log but using the current schema does pose a problem if/when
> the schema has evolved since the point in the log that we’re currently
> reading. This is far more of an issue if you’re playing catch up and
> there’s been non-compatible schema changes.
>
> Case in point: when MySQL inserts/updates/removes a row from a table, it
> writes an event in the log that includes (a) a table identifier and (b) the
> row values in column-order. There is no other information. Column renames
> might be okay, but adding or removing columns will likely result in
> mismatching the row values to the appropriate columns.
>
> Luckily, MySQL includes the DDL statements in the log, so my connector
> parses these as part of processing and builds up the schema state as it
> goes along. This works beautifully, with the only issue being how to
> persist and recover this after restarts.
>

Yeah, this is a common complaint about the MySQL binlog. I know James
mentioned this as well. It's a bit crazy that you need a full parser for
the DDL to make this work :/


>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
>
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
>
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
>
> there's no concern having to tail it on all workers (and no concern for the
>
> load on brokers leading those partitions). Once you provide a generic state
>
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
>
> hacky way to get around this, which is to store that schema information in
>
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
>
> performance, I'm assuming these schema changes are relatively rare, and you
>
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
>
> Hmm, it sound like hammering accumulated state into the offsets could be
> pretty problematic and potentially risky, especially if the state has very
> different size and frequency characteristics than the offsets.
>

Yes, as I siad, it is a hack. I mainly mentioned it here because I think
for the amount of metadata you need to carry through, it could be a
workable solution.


>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
>
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
>
> Having a way to get the worker config would be awesome, and IMO it a nice
> minimalistic approach. If you think this is a good idea, I can log a JIRA
> and take it to the dev list. I’m willing to work on it, too.
>
 I think this is not going to be commonly used, but I think it'd be fine to
expose it.

> I’m starting to think that storing state on a separate dedicated topic is
> the best option, at least for me. First, connector tasks can easily record
> their state by simply adding more SourceRecord instances during polling.
> Second, that information might be useful for downstream consumers. And
> third, recording state this way requires no changes to the current Kafka
> Connect. I’d hate to add a feature to Kafka Connect that is not useful to
> others.
>
> Recovery would require consuming this topic upon restart. If the state
> were incremental, then a restart might require consuming the entire topic.
> If the state were snapshots, then simply reading the last entry might be
> sufficient. There’s also the option of doing primarily incremental with
> periodic snapshots.
>
> For my connector, I can easily store each DDL statement, making it
> incremental. Consuming the whole topic upon restart shouldn’t be that
> intensive. And if it does, then that only adds to the restart time a bit —
> no big deal.
>
This all sounds like it should work fine, just remember that you'll have to
store enough info with the DDL statement to indicate when it should be
applied. The case you want to be careful of is if your connector processes
the change and crashes before offsets are committed. When the process comes
back up, you need to make sure that if it recovers and starts at an earlier
offset that it will still be able to get the previous schema. That's the
drawback of not being able to tie writes of state in with offset commits --
you need to handle potential inconsistencies.


> Finally, I'd love to know which DB you're reading the transaction log from
>
> and if you're planning on open sourcing the connector:)
>
>
> As I mentioned, it’s MySQL at the moment and part of a very (!) new OSS
> project called Debezium (http://debezium.io), and the MySQL connector is
> the first of hopefully many connectors for a variety of databases.
> (Contributors welcome!)
>

Looks cool! I see you have at least skeletons for a couple of different
DBs. Will the Kafka Connect adapter work with all of them?

-Ewen


>
> Best regards,
>
> Randall
>



-- 
Thanks,
Ewen

Re: Accumulating data in Kafka Connect source tasks

Posted by Randall Hauch <rh...@gmail.com>.
On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (ewen@confluent.io) wrote:
Randall, 

Great question. Ideally you wouldn't need this type of state since it 
should really be available in the source system. In your case, it might 
actually make sense to be able to grab that information from the DB itself, 
although that will also have issues if, for example, there have been 
multiple schema changes and you can no longer get a previous schema from 
the current state of the tables. 
I agree that ideally connectors would be stateless, or at least have no need for maintaining state across restarts. Unfortunately, that’s not always possible.

Reading the log but using the current schema does pose a problem if/when the schema has evolved since the point in the log that we’re currently reading. This is far more of an issue if you’re playing catch up and there’s been non-compatible schema changes.

Case in point: when MySQL inserts/updates/removes a row from a table, it writes an event in the log that includes (a) a table identifier and (b) the row values in column-order. There is no other information. Column renames might be okay, but adding or removing columns will likely result in mismatching the row values to the appropriate columns.

Luckily, MySQL includes the DDL statements in the log, so my connector parses these as part of processing and builds up the schema state as it goes along. This works beautifully, with the only issue being how to persist and recover this after restarts.


The offset storage is probably pretty close to what you're looking for, 
although we obviously structure that very narrowly. Adding in some sort of 
other state store is an interesting idea, though I'd be curious how many 
other systems encounter similar challenges. I think one way to do this 
without huge changes and in a way that properly handles offset commits 
would be to expose a small API for setting local state and have Connect 
store that state right in the same topic (and message) as offsets. To 
handle offset commits and reviving tasks that hit a fault, we would just 
grab the current state as part of the process of committing offsets. Then 
offsets would just be a special case of that more general state. 

However, I'm also wary of doing something like this. Right now every worker 
has to consume the entire offsets topic. This works fine because offsets, 
while capable of being pretty complex, are generally pretty small such that 
there's no concern having to tail it on all workers (and no concern for the 
load on brokers leading those partitions). Once you provide a generic state 
storage mechanism without clear constraints on how it should be used, 
someone will come along and abuse it. Also, with offsets it is very clear 
(to the connector developer) which task should write to which keys (where 
the key here is the partition of the source partitioned stream). With 
support for arbitrary state, ownership of different subsets of the key 
space is very unclear. I think you may not have that particular problem 
because you probably only have 1 partition anyway since you are reading a 
transaction log. 

In any case, you're right that this doesn't exist today. There is one very 
hacky way to get around this, which is to store that schema information in 
your "offsets". This may not be *too* bad -- it'll increase the size of 
offset data, but probably doesn't affect much else. The data size may not 
be that bad as long as offsets aren't committed too frequently. In terms of 
performance, I'm assuming these schema changes are relatively rare, and you 
can just include the same schema object in every offset you create during 
the periods between schema changes so you (and the GC) are probably only 
doing a negligible amount of extra work. 

Hmm, it sound like hammering accumulated state into the offsets could be pretty problematic and potentially risky, especially if the state has very different size and frequency characteristics than the offsets.

Re: creating a consumer, Connect doesn't provide any utilities to do that 
since the goal is to handle everything Kafka-related for the connector 
developer so they can just focus on getting the data from the other system! 
We could consider exposing some of the worker config though, which I 
imagine is all you really need -- it'd just be convenient to have the 
connection info for the Kafka brokers. 
Having a way to get the worker config would be awesome, and IMO it a nice minimalistic approach. If you think this is a good idea, I can log a JIRA and take it to the dev list. I’m willing to work on it, too. 

I’m starting to think that storing state on a separate dedicated topic is the best option, at least for me. First, connector tasks can easily record their state by simply adding more SourceRecord instances during polling. Second, that information might be useful for downstream consumers. And third, recording state this way requires no changes to the current Kafka Connect. I’d hate to add a feature to Kafka Connect that is not useful to others.

Recovery would require consuming this topic upon restart. If the state were incremental, then a restart might require consuming the entire topic. If the state were snapshots, then simply reading the last entry might be sufficient. There’s also the option of doing primarily incremental with periodic snapshots.

For my connector, I can easily store each DDL statement, making it incremental. Consuming the whole topic upon restart shouldn’t be that intensive. And if it does, then that only adds to the restart time a bit — no big deal.

Finally, I'd love to know which DB you're reading the transaction log from 
and if you're planning on open sourcing the connector:) 

As I mentioned, it’s MySQL at the moment and part of a very (!) new OSS project called Debezium (http://debezium.io), and the MySQL connector is the first of hopefully many connectors for a variety of databases. (Contributors welcome!)

Best regards,

Randall

Re: Accumulating data in Kafka Connect source tasks

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Randall,

Great question. Ideally you wouldn't need this type of state since it
should really be available in the source system. In your case, it might
actually make sense to be able to grab that information from the DB itself,
although that will also have issues if, for example, there have been
multiple schema changes and you can no longer get a previous schema from
the current state of the tables.

The offset storage is probably pretty close to what you're looking for,
although we obviously structure that very narrowly. Adding in some sort of
other state store is an interesting idea, though I'd be curious how many
other systems encounter similar challenges. I think one way to do this
without huge changes and in a way that properly handles offset commits
would be to expose a small API for setting local state and have Connect
store that state right in the same topic (and message) as offsets. To
handle offset commits and reviving tasks that hit a fault, we would just
grab the current state as part of the process of committing offsets. Then
offsets would just be a special case of that more general state.

However, I'm also wary of doing something like this. Right now every worker
has to consume the entire offsets topic. This works fine because offsets,
while capable of being pretty complex, are generally pretty small such that
there's no concern having to tail it on all workers (and no concern for the
load on brokers leading those partitions). Once you provide a generic state
storage mechanism without clear constraints on how it should be used,
someone will come along and abuse it. Also, with offsets it is very clear
(to the connector developer) which task should write to which keys (where
the key here is the partition of the source partitioned stream). With
support for arbitrary state, ownership of different subsets of the key
space is very unclear. I think you may not have that particular problem
because you probably only have 1 partition anyway since you are reading a
transaction log.

In any case, you're right that this doesn't exist today. There is one very
hacky way to get around this, which is to store that schema information in
your "offsets". This may not be *too* bad -- it'll increase the size of
offset data, but probably doesn't affect much else. The data size may not
be that bad as long as offsets aren't committed too frequently. In terms of
performance, I'm assuming these schema changes are relatively rare, and you
can just include the same schema object in every offset you create during
the periods between schema changes so you (and the GC) are probably only
doing a negligible amount of extra work.

Re: creating a consumer, Connect doesn't provide any utilities to do that
since the goal is to handle everything Kafka-related for the connector
developer so they can just focus on getting the data from the other system!
We could consider exposing some of the worker config though, which I
imagine is all you really need -- it'd just be convenient to have the
connection info for the Kafka brokers.

Finally, I'd love to know which DB you're reading the transaction log from
and if you're planning on open sourcing the connector:)

-Ewen

On Thu, Jan 28, 2016 at 6:12 AM, Randall Hauch <rh...@gmail.com> wrote:

> Rather than leave this thread so open ended, perhaps I can narrow down to
> what I think is the best approach. These accumulations are really just
> additional information from the source that don’t get written to the normal
> topics. Instead, each change to the accumulated state can be emitted as
> source records on a dedicated topic. That is very straightforward with the
> existing Kafka Connect.
>
> The challenge I’m struggling with is how a task can/should, upon startup,
> *consume* that stream to rebuild its state. I can set up my own Kafka
> consumer for that topic, but IIUC now my connector config has to include
> much of the same information included in the Kafka Connect workers
> configuration.
>
> Am I just missing how a connector can see the worker configuration
> properties? Or is there a way that Kafka Connect can help me create a Kafka
> consumer?
>
> Best regards,
>
> Randall Hauch
>
> On January 28, 2016 at 12:11:07 AM, Randall Hauch (rhauch@gmail.com)
> wrote:
> I’m creating a custom Kafka Connect source connector, and I’m running into
> a situation for which Kafka Connect doesn’t seem to provide a solution out
> of the box. I thought I’d first post to the users list in case I’m just
> missing a feature that’s already there.
>
> My connector’s SourceTask implementation is reading a relational database
> transaction log. That log contains schema changes and row changes, and the
> row changes include a reference to the table and the row values. Thus, as
> the task processes the log, it has to use any schema changes in the log to
> adjust how it converts subsequent row changes into Kafka source records.
> Should the task stop and be restarted elsewhere, it can continue reading
> the transaction log where it left off only if that new task instance can
> recover the schema state accumulated by an earlier task.
>
> While I certainly can use a custom solution to store this state somewhere,
> it seems like other connectors might benefit from having Kafka Connect
> include something out of the box. And, this accumulated state (and its
> history with respect to the source offset at which the state changes) seems
> like a perfect fit for storing in a Kafka topic.
>
> Does Kafka Connect already have a mechanism for tasks to store and recover
> arbitrary state? If not, then is there interest in adding this capability
> to Kafka Connect? (If there is interest, then perhaps the dev list is a
> better venue.)
>
> Best regards,
>
> Randall Hauch
>



-- 
Thanks,
Ewen

Re: Accumulating data in Kafka Connect source tasks

Posted by Randall Hauch <rh...@gmail.com>.
Rather than leave this thread so open ended, perhaps I can narrow down to what I think is the best approach. These accumulations are really just additional information from the source that don’t get written to the normal topics. Instead, each change to the accumulated state can be emitted as source records on a dedicated topic. That is very straightforward with the existing Kafka Connect.

The challenge I’m struggling with is how a task can/should, upon startup, *consume* that stream to rebuild its state. I can set up my own Kafka consumer for that topic, but IIUC now my connector config has to include much of the same information included in the Kafka Connect workers configuration. 

Am I just missing how a connector can see the worker configuration properties? Or is there a way that Kafka Connect can help me create a Kafka consumer?

Best regards,

Randall Hauch

On January 28, 2016 at 12:11:07 AM, Randall Hauch (rhauch@gmail.com) wrote:
I’m creating a custom Kafka Connect source connector, and I’m running into a situation for which Kafka Connect doesn’t seem to provide a solution out of the box. I thought I’d first post to the users list in case I’m just missing a feature that’s already there.

My connector’s SourceTask implementation is reading a relational database transaction log. That log contains schema changes and row changes, and the row changes include a reference to the table and the row values. Thus, as the task processes the log, it has to use any schema changes in the log to adjust how it converts subsequent row changes into Kafka source records. Should the task stop and be restarted elsewhere, it can continue reading the transaction log where it left off only if that new task instance can recover the schema state accumulated by an earlier task.

While I certainly can use a custom solution to store this state somewhere, it seems like other connectors might benefit from having Kafka Connect include something out of the box. And, this accumulated state (and its history with respect to the source offset at which the state changes) seems like a perfect fit for storing in a Kafka topic.

Does Kafka Connect already have a mechanism for tasks to store and recover arbitrary state? If not, then is there interest in adding this capability to Kafka Connect? (If there is interest, then perhaps the dev list is a better venue.)

Best regards,

Randall Hauch