You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sunny Shah <su...@tinyowl.co.in> on 2016/01/27 05:43:33 UTC

MongoDB Kafka Connect driver

Hi ,

We are trying to write a Kafka-connect connector for Mongodb. The issue is,
MongoDB does not provide an entire changed document for update operations,
It just provides the modified fields.

if Kafka allows custom log compaction then It is possible to eventually
merge an entire document and subsequent update to to create an entire
record again.

As Ewen pointed out to me on twitter, this is not possible, then What is
the Kafka-connect way of solving this issue?

@Ewen, Thanks a lot for a really quick answer on twitter.

-- 
Thanks and Regards,
 Sunny

-- 
The contents of this e-mail and any attachment(s) are confidential and 
intended for the named recipient(s) only. It shall not attach any liability 
on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any 
form of reproduction, dissemination, copying, disclosure, modification, 
distribution and / or publication of this message without the prior written 
consent of the author of this e-mail is strictly prohibited. If you have 
received this email in error please delete it and notify the sender 
immediately. You are liable to the company (TinyOwl Technology Pvt. Ltd.) in 
case of any breach in ​
​confidentialy (through any form of communication) wherein the company has 
the right to injunct legal action and an equitable relief for damages.

Re: MongoDB Kafka Connect driver

Posted by Sunny Shah <su...@tinyowl.co.in>.
Hello Everyone,

Thanks a lot for your valuable responses.

We will use external database to store Key and Kafka-offset, We won't set
any preference on which database to use, We will leave it to the
driver-user by using a flexible data-access-model like Apache Metamodel.

@James, Even for MongoDB connect driver till time T2 it won't be in the
consistent stage.

On Sat, Jan 30, 2016 at 3:43 AM, James Cheng <jc...@tivo.com> wrote:

> Not sure if this will help anything, but just throwing it out there.
>
> The Maxwell and mypipe projects both do CDC from MySQL and support
> bootstrapping. The way they do it is kind of "eventually consistent".
>
> 1) At time T1, record coordinates of the end of the binlog as of T1.
> 2) At time T2, do a full dump of the database into Kafka.
> 3) Connect back to the binlog in the coordinates recorded in step #1, and
> emit all those records into Kafka.
>
> As Jay mentioned, MySQL supports full row images. At the start of step #3,
> the kafka topic contains all rows as of time T2. It is possible that during
> step #3, that you will emit rows that changed between T1 and T2. From the
> point of view of the consumer of the kafka topic, they would see rows that
> went "back in time". However, as step #3 progresses, and the consumer keeps
> reading, those rows would eventually converge down to their final state.
>
> Maxwell: https://github.com/zendesk/maxwell
> mypipe: https://github.com/mardambey/mypipe
>
> Does that idea help in any way? Btw, a reason it is done this way is that
> it is "difficult" to do #1 and #2 above in a coordinated way without
> locking the database or without adding additional outside dependencies (LVM
> snapshots, being a specific one).
>
> Btw, I glanced at some docs about the Mongodb oplog. It seems that each
> oplog contains
> 1) A way to identify the document that the change applies to.
> 2) A series of mongodb commands (set, unset) to alter the document in #1
> to become the new document.
>
> Thoughts:
> For #1, does it identify a particular "version" of a document? (I don't
> know much about mongodb). If so, you might be able to use it to determine
> if the change should even be attempted to be applied to the object.
> For #2, doesn't that mean you'll need "understand" mongodb's syntax and
> commands? Although maybe it is simply sets/unsets/deletes, in which case
> it's maybe pretty simple.
>
> -James
>
> > On Jan 29, 2016, at 9:39 AM, Jay Kreps <ja...@confluent.io> wrote:
> >
> > Also, most database provide a "full logging" option that let's you
> capture
> > the whole row in the log (I know Oracle and MySQL have this) but it
> sounds
> > like Mongo doesn't yet. That would be the ideal solution.
> >
> > -Jay
> >
> > On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps <ja...@confluent.io> wrote:
> >
> >> Ah, agreed. This approach is actually quite common in change capture,
> >> though. For many use cases getting the final value is actually
> preferable
> >> to getting intermediates. The exception is usually if you want to do
> >> analytics on something like number of changes.
> >>
> >> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> >> wrote:
> >>
> >>> Jay,
> >>>
> >>> You can query after the fact, but you're not necessarily going to get
> the
> >>> same value back. There could easily be dozens of changes to the
> document
> >>> in
> >>> the oplog so the delta you see may not even make sense given the
> current
> >>> state of the document. Even if you can apply it the delta, you'd still
> be
> >>> seeing data that is newer than the update. You can of course take this
> >>> shortcut, but it won't give correct results. And if the data has been
> >>> deleted since then, you won't even be able to write the full record...
> As
> >>> far as I know, the way the op log is exposed won't let you do something
> >>> like pin a query to the state of the db at a specific point in the op
> log
> >>> and you may be reading from the beginning of the op log, so I don't
> think
> >>> there's a way to get correct results by just querying the DB for the
> full
> >>> documents.
> >>>
> >>> Strictly speaking you don't need to get all the data in memory, you
> just
> >>> need a record of the current set of values somewhere. This is what I
> was
> >>> describing following those two options -- if you do an initial dump to
> >>> Kafka, you could track only offsets in memory and read back full
> values as
> >>> needed to apply deltas, but this of course requires random reads into
> your
> >>> Kafka topic (but may perform fine in practice depending on the
> workload).
> >>>
> >>> -Ewen
> >>>
> >>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <ja...@confluent.io> wrote:
> >>>
> >>>> Hey Ewen, how come you need to get it all in memory for approach (1)?
> I
> >>>> guess the obvious thing to do would just be to query for the record
> >>>> after-image when you get the diff--e.g. just read a batch of changes
> and
> >>>> multi-get the final values. I don't know how bad the overhead of this
> >>> would
> >>>> be...batching might reduce it a fair amount. The guarantees for this
> are
> >>>> slightly different than the pure oplog too (you get the current value
> >>> not
> >>>> every necessarily every intermediate) but that should be okay for most
> >>>> uses.
> >>>>
> >>>> -Jay
> >>>>
> >>>> On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
> >>> ewen@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Sunny,
> >>>>>
> >>>>> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> >>>>> connector! It struck me as a pretty natural source to tackle since it
> >>>> does
> >>>>> such a nice job of cleanly exposing the op log.
> >>>>>
> >>>>> Regarding the problem of only getting deltas, unfortunately there is
> >>> not
> >>>> a
> >>>>> trivial solution here -- if you want to generate the full updated
> >>> record,
> >>>>> you're going to have to have a way to recover the original document.
> >>>>>
> >>>>> In fact, I'm curious how you were thinking of even bootstrapping. Are
> >>> you
> >>>>> going to do a full dump and then start reading the op log? Is there a
> >>>> good
> >>>>> way to do the dump and figure out the exact location in the op log
> >>> that
> >>>> the
> >>>>> query generating the dump was initially performed? I know that
> >>> internally
> >>>>> mongo effectively does these two steps, but I'm not sure if the
> >>> necessary
> >>>>> info is exposed via normal queries.
> >>>>>
> >>>>> If you want to reconstitute the data, I can think of a couple of
> >>> options:
> >>>>>
> >>>>> 1. Try to reconstitute inline in the connector. This seems difficult
> >>> to
> >>>>> make work in practice. At some point you basically have to query for
> >>> the
> >>>>> entire data set to bring it into memory and then the connector is
> >>>>> effectively just applying the deltas to its in memory copy and then
> >>> just
> >>>>> generating one output record containing the full document each time
> it
> >>>>> applies an update.
> >>>>> 2. Make the connector send just the updates and have a separate
> stream
> >>>>> processing job perform the reconstitution and send to another topic.
> >>> In
> >>>>> this case, the first topic should not be compacted, but the second
> one
> >>>>> could be.
> >>>>>
> >>>>> Unfortunately, without additional hooks into the database, there's
> not
> >>>> much
> >>>>> you can do besides this pretty heavyweight process. There may be some
> >>>>> tricks you can use to reduce the amount of memory used during the
> >>> process
> >>>>> (e.g. keep a small cache of actual records and for the rest only
> store
> >>>>> Kafka offsets for the last full value, performing a (possibly
> >>> expensive)
> >>>>> random read as necessary to get the full document value back), but to
> >>> get
> >>>>> full correctness you will need to perform this process.
> >>>>>
> >>>>> In terms of Kafka Connect supporting something like this, I'm not
> sure
> >>>> how
> >>>>> general it could be made, or that you even want to perform the
> process
> >>>>> inline with the Kafka Connect job. If it's an issue that repeatedly
> >>>> arises
> >>>>> across a variety of systems, then we should consider how to address
> it
> >>>> more
> >>>>> generally.
> >>>>>
> >>>>> -Ewen
> >>>>>
> >>>>> On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in>
> >>> wrote:
> >>>>>
> >>>>>>
> >>>>>> Hi ,
> >>>>>>
> >>>>>> We are trying to write a Kafka-connect connector for Mongodb. The
> >>> issue
> >>>>>> is, MongoDB does not provide an entire changed document for update
> >>>>>> operations, It just provides the modified fields.
> >>>>>>
> >>>>>> if Kafka allows custom log compaction then It is possible to
> >>> eventually
> >>>>>> merge an entire document and subsequent update to to create an
> >>> entire
> >>>>>> record again.
> >>>>>>
> >>>>>> As Ewen pointed out to me on twitter, this is not possible, then
> >>> What
> >>>> is
> >>>>>> the Kafka-connect way of solving this issue?
> >>>>>>
> >>>>>> @Ewen, Thanks a lot for a really quick answer on twitter.
> >>>>>>
> >>>>>> --
> >>>>>> Thanks and Regards,
> >>>>>> Sunny
> >>>>>>
> >>>>>> The contents of this e-mail and any attachment(s) are confidential
> >>> and
> >>>>>> intended for the named recipient(s) only. It shall not attach any
> >>>>> liability
> >>>>>> on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates.
> >>>> Any
> >>>>>> form of reproduction, dissemination, copying, disclosure,
> >>> modification,
> >>>>>> distribution and / or publication of this message without the prior
> >>>>> written
> >>>>>> consent of the author of this e-mail is strictly prohibited. If you
> >>>> have
> >>>>>> received this email in error please delete it and notify the sender
> >>>>>> immediately. You are liable to the company (TinyOwl Technology Pvt.
> >>>>> Ltd.) in
> >>>>>> case of any breach in ​
> >>>>>> ​confidentialy (through any form of communication) wherein the
> >>> company
> >>>>> has
> >>>>>> the right to injunct legal action and an equitable relief for
> >>> damages.
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Thanks,
> >>>>> Ewen
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Ewen
> >>>
> >>
> >>
>
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



-- 
Thanks and Regards,
 Sunny

-- 
The contents of this e-mail and any attachment(s) are confidential and 
intended for the named recipient(s) only. It shall not attach any liability 
on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any 
form of reproduction, dissemination, copying, disclosure, modification, 
distribution and / or publication of this message without the prior written 
consent of the author of this e-mail is strictly prohibited. If you have 
received this email in error please delete it and notify the sender 
immediately. You are liable to the company (TinyOwl Technology Pvt. Ltd.) in 
case of any breach in ​
​confidentialy (through any form of communication) wherein the company has 
the right to injunct legal action and an equitable relief for damages.

Re: MongoDB Kafka Connect driver

Posted by James Cheng <jc...@tivo.com>.
Not sure if this will help anything, but just throwing it out there.

The Maxwell and mypipe projects both do CDC from MySQL and support bootstrapping. The way they do it is kind of "eventually consistent".

1) At time T1, record coordinates of the end of the binlog as of T1.
2) At time T2, do a full dump of the database into Kafka.
3) Connect back to the binlog in the coordinates recorded in step #1, and emit all those records into Kafka.

As Jay mentioned, MySQL supports full row images. At the start of step #3, the kafka topic contains all rows as of time T2. It is possible that during step #3, that you will emit rows that changed between T1 and T2. From the point of view of the consumer of the kafka topic, they would see rows that went "back in time". However, as step #3 progresses, and the consumer keeps reading, those rows would eventually converge down to their final state.

Maxwell: https://github.com/zendesk/maxwell
mypipe: https://github.com/mardambey/mypipe

Does that idea help in any way? Btw, a reason it is done this way is that it is "difficult" to do #1 and #2 above in a coordinated way without locking the database or without adding additional outside dependencies (LVM snapshots, being a specific one).

Btw, I glanced at some docs about the Mongodb oplog. It seems that each oplog contains
1) A way to identify the document that the change applies to.
2) A series of mongodb commands (set, unset) to alter the document in #1 to become the new document.

Thoughts:
For #1, does it identify a particular "version" of a document? (I don't know much about mongodb). If so, you might be able to use it to determine if the change should even be attempted to be applied to the object.
For #2, doesn't that mean you'll need "understand" mongodb's syntax and commands? Although maybe it is simply sets/unsets/deletes, in which case it's maybe pretty simple.

-James

> On Jan 29, 2016, at 9:39 AM, Jay Kreps <ja...@confluent.io> wrote:
>
> Also, most database provide a "full logging" option that let's you capture
> the whole row in the log (I know Oracle and MySQL have this) but it sounds
> like Mongo doesn't yet. That would be the ideal solution.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps <ja...@confluent.io> wrote:
>
>> Ah, agreed. This approach is actually quite common in change capture,
>> though. For many use cases getting the final value is actually preferable
>> to getting intermediates. The exception is usually if you want to do
>> analytics on something like number of changes.
>>
>> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava <ew...@confluent.io>
>> wrote:
>>
>>> Jay,
>>>
>>> You can query after the fact, but you're not necessarily going to get the
>>> same value back. There could easily be dozens of changes to the document
>>> in
>>> the oplog so the delta you see may not even make sense given the current
>>> state of the document. Even if you can apply it the delta, you'd still be
>>> seeing data that is newer than the update. You can of course take this
>>> shortcut, but it won't give correct results. And if the data has been
>>> deleted since then, you won't even be able to write the full record... As
>>> far as I know, the way the op log is exposed won't let you do something
>>> like pin a query to the state of the db at a specific point in the op log
>>> and you may be reading from the beginning of the op log, so I don't think
>>> there's a way to get correct results by just querying the DB for the full
>>> documents.
>>>
>>> Strictly speaking you don't need to get all the data in memory, you just
>>> need a record of the current set of values somewhere. This is what I was
>>> describing following those two options -- if you do an initial dump to
>>> Kafka, you could track only offsets in memory and read back full values as
>>> needed to apply deltas, but this of course requires random reads into your
>>> Kafka topic (but may perform fine in practice depending on the workload).
>>>
>>> -Ewen
>>>
>>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <ja...@confluent.io> wrote:
>>>
>>>> Hey Ewen, how come you need to get it all in memory for approach (1)? I
>>>> guess the obvious thing to do would just be to query for the record
>>>> after-image when you get the diff--e.g. just read a batch of changes and
>>>> multi-get the final values. I don't know how bad the overhead of this
>>> would
>>>> be...batching might reduce it a fair amount. The guarantees for this are
>>>> slightly different than the pure oplog too (you get the current value
>>> not
>>>> every necessarily every intermediate) but that should be okay for most
>>>> uses.
>>>>
>>>> -Jay
>>>>
>>>> On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>>> ewen@confluent.io>
>>>> wrote:
>>>>
>>>>> Sunny,
>>>>>
>>>>> As I said on Twitter, I'm stoked to hear you're working on a Mongo
>>>>> connector! It struck me as a pretty natural source to tackle since it
>>>> does
>>>>> such a nice job of cleanly exposing the op log.
>>>>>
>>>>> Regarding the problem of only getting deltas, unfortunately there is
>>> not
>>>> a
>>>>> trivial solution here -- if you want to generate the full updated
>>> record,
>>>>> you're going to have to have a way to recover the original document.
>>>>>
>>>>> In fact, I'm curious how you were thinking of even bootstrapping. Are
>>> you
>>>>> going to do a full dump and then start reading the op log? Is there a
>>>> good
>>>>> way to do the dump and figure out the exact location in the op log
>>> that
>>>> the
>>>>> query generating the dump was initially performed? I know that
>>> internally
>>>>> mongo effectively does these two steps, but I'm not sure if the
>>> necessary
>>>>> info is exposed via normal queries.
>>>>>
>>>>> If you want to reconstitute the data, I can think of a couple of
>>> options:
>>>>>
>>>>> 1. Try to reconstitute inline in the connector. This seems difficult
>>> to
>>>>> make work in practice. At some point you basically have to query for
>>> the
>>>>> entire data set to bring it into memory and then the connector is
>>>>> effectively just applying the deltas to its in memory copy and then
>>> just
>>>>> generating one output record containing the full document each time it
>>>>> applies an update.
>>>>> 2. Make the connector send just the updates and have a separate stream
>>>>> processing job perform the reconstitution and send to another topic.
>>> In
>>>>> this case, the first topic should not be compacted, but the second one
>>>>> could be.
>>>>>
>>>>> Unfortunately, without additional hooks into the database, there's not
>>>> much
>>>>> you can do besides this pretty heavyweight process. There may be some
>>>>> tricks you can use to reduce the amount of memory used during the
>>> process
>>>>> (e.g. keep a small cache of actual records and for the rest only store
>>>>> Kafka offsets for the last full value, performing a (possibly
>>> expensive)
>>>>> random read as necessary to get the full document value back), but to
>>> get
>>>>> full correctness you will need to perform this process.
>>>>>
>>>>> In terms of Kafka Connect supporting something like this, I'm not sure
>>>> how
>>>>> general it could be made, or that you even want to perform the process
>>>>> inline with the Kafka Connect job. If it's an issue that repeatedly
>>>> arises
>>>>> across a variety of systems, then we should consider how to address it
>>>> more
>>>>> generally.
>>>>>
>>>>> -Ewen
>>>>>
>>>>> On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in>
>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi ,
>>>>>>
>>>>>> We are trying to write a Kafka-connect connector for Mongodb. The
>>> issue
>>>>>> is, MongoDB does not provide an entire changed document for update
>>>>>> operations, It just provides the modified fields.
>>>>>>
>>>>>> if Kafka allows custom log compaction then It is possible to
>>> eventually
>>>>>> merge an entire document and subsequent update to to create an
>>> entire
>>>>>> record again.
>>>>>>
>>>>>> As Ewen pointed out to me on twitter, this is not possible, then
>>> What
>>>> is
>>>>>> the Kafka-connect way of solving this issue?
>>>>>>
>>>>>> @Ewen, Thanks a lot for a really quick answer on twitter.
>>>>>>
>>>>>> --
>>>>>> Thanks and Regards,
>>>>>> Sunny
>>>>>>
>>>>>> The contents of this e-mail and any attachment(s) are confidential
>>> and
>>>>>> intended for the named recipient(s) only. It shall not attach any
>>>>> liability
>>>>>> on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates.
>>>> Any
>>>>>> form of reproduction, dissemination, copying, disclosure,
>>> modification,
>>>>>> distribution and / or publication of this message without the prior
>>>>> written
>>>>>> consent of the author of this e-mail is strictly prohibited. If you
>>>> have
>>>>>> received this email in error please delete it and notify the sender
>>>>>> immediately. You are liable to the company (TinyOwl Technology Pvt.
>>>>> Ltd.) in
>>>>>> case of any breach in ​
>>>>>> ​confidentialy (through any form of communication) wherein the
>>> company
>>>>> has
>>>>>> the right to injunct legal action and an equitable relief for
>>> damages.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Ewen
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ewen
>>>
>>
>>


________________________________

This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Re: MongoDB Kafka Connect driver

Posted by Jay Kreps <ja...@confluent.io>.
Also, most database provide a "full logging" option that let's you capture
the whole row in the log (I know Oracle and MySQL have this) but it sounds
like Mongo doesn't yet. That would be the ideal solution.

-Jay

On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps <ja...@confluent.io> wrote:

> Ah, agreed. This approach is actually quite common in change capture,
> though. For many use cases getting the final value is actually preferable
> to getting intermediates. The exception is usually if you want to do
> analytics on something like number of changes.
>
> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
>> Jay,
>>
>> You can query after the fact, but you're not necessarily going to get the
>> same value back. There could easily be dozens of changes to the document
>> in
>> the oplog so the delta you see may not even make sense given the current
>> state of the document. Even if you can apply it the delta, you'd still be
>> seeing data that is newer than the update. You can of course take this
>> shortcut, but it won't give correct results. And if the data has been
>> deleted since then, you won't even be able to write the full record... As
>> far as I know, the way the op log is exposed won't let you do something
>> like pin a query to the state of the db at a specific point in the op log
>> and you may be reading from the beginning of the op log, so I don't think
>> there's a way to get correct results by just querying the DB for the full
>> documents.
>>
>> Strictly speaking you don't need to get all the data in memory, you just
>> need a record of the current set of values somewhere. This is what I was
>> describing following those two options -- if you do an initial dump to
>> Kafka, you could track only offsets in memory and read back full values as
>> needed to apply deltas, but this of course requires random reads into your
>> Kafka topic (but may perform fine in practice depending on the workload).
>>
>> -Ewen
>>
>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <ja...@confluent.io> wrote:
>>
>> > Hey Ewen, how come you need to get it all in memory for approach (1)? I
>> > guess the obvious thing to do would just be to query for the record
>> > after-image when you get the diff--e.g. just read a batch of changes and
>> > multi-get the final values. I don't know how bad the overhead of this
>> would
>> > be...batching might reduce it a fair amount. The guarantees for this are
>> > slightly different than the pure oplog too (you get the current value
>> not
>> > every necessarily every intermediate) but that should be okay for most
>> > uses.
>> >
>> > -Jay
>> >
>> > On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>> ewen@confluent.io>
>> > wrote:
>> >
>> > > Sunny,
>> > >
>> > > As I said on Twitter, I'm stoked to hear you're working on a Mongo
>> > > connector! It struck me as a pretty natural source to tackle since it
>> > does
>> > > such a nice job of cleanly exposing the op log.
>> > >
>> > > Regarding the problem of only getting deltas, unfortunately there is
>> not
>> > a
>> > > trivial solution here -- if you want to generate the full updated
>> record,
>> > > you're going to have to have a way to recover the original document.
>> > >
>> > > In fact, I'm curious how you were thinking of even bootstrapping. Are
>> you
>> > > going to do a full dump and then start reading the op log? Is there a
>> > good
>> > > way to do the dump and figure out the exact location in the op log
>> that
>> > the
>> > > query generating the dump was initially performed? I know that
>> internally
>> > > mongo effectively does these two steps, but I'm not sure if the
>> necessary
>> > > info is exposed via normal queries.
>> > >
>> > > If you want to reconstitute the data, I can think of a couple of
>> options:
>> > >
>> > > 1. Try to reconstitute inline in the connector. This seems difficult
>> to
>> > > make work in practice. At some point you basically have to query for
>> the
>> > > entire data set to bring it into memory and then the connector is
>> > > effectively just applying the deltas to its in memory copy and then
>> just
>> > > generating one output record containing the full document each time it
>> > > applies an update.
>> > > 2. Make the connector send just the updates and have a separate stream
>> > > processing job perform the reconstitution and send to another topic.
>> In
>> > > this case, the first topic should not be compacted, but the second one
>> > > could be.
>> > >
>> > > Unfortunately, without additional hooks into the database, there's not
>> > much
>> > > you can do besides this pretty heavyweight process. There may be some
>> > > tricks you can use to reduce the amount of memory used during the
>> process
>> > > (e.g. keep a small cache of actual records and for the rest only store
>> > > Kafka offsets for the last full value, performing a (possibly
>> expensive)
>> > > random read as necessary to get the full document value back), but to
>> get
>> > > full correctness you will need to perform this process.
>> > >
>> > > In terms of Kafka Connect supporting something like this, I'm not sure
>> > how
>> > > general it could be made, or that you even want to perform the process
>> > > inline with the Kafka Connect job. If it's an issue that repeatedly
>> > arises
>> > > across a variety of systems, then we should consider how to address it
>> > more
>> > > generally.
>> > >
>> > > -Ewen
>> > >
>> > > On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in>
>> wrote:
>> > >
>> > > >
>> > > > Hi ,
>> > > >
>> > > > We are trying to write a Kafka-connect connector for Mongodb. The
>> issue
>> > > > is, MongoDB does not provide an entire changed document for update
>> > > > operations, It just provides the modified fields.
>> > > >
>> > > > if Kafka allows custom log compaction then It is possible to
>> eventually
>> > > > merge an entire document and subsequent update to to create an
>> entire
>> > > > record again.
>> > > >
>> > > > As Ewen pointed out to me on twitter, this is not possible, then
>> What
>> > is
>> > > > the Kafka-connect way of solving this issue?
>> > > >
>> > > > @Ewen, Thanks a lot for a really quick answer on twitter.
>> > > >
>> > > > --
>> > > > Thanks and Regards,
>> > > >  Sunny
>> > > >
>> > > > The contents of this e-mail and any attachment(s) are confidential
>> and
>> > > > intended for the named recipient(s) only. It shall not attach any
>> > > liability
>> > > > on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates.
>> > Any
>> > > > form of reproduction, dissemination, copying, disclosure,
>> modification,
>> > > > distribution and / or publication of this message without the prior
>> > > written
>> > > > consent of the author of this e-mail is strictly prohibited. If you
>> > have
>> > > > received this email in error please delete it and notify the sender
>> > > > immediately. You are liable to the company (TinyOwl Technology Pvt.
>> > > Ltd.) in
>> > > > case of any breach in ​
>> > > > ​confidentialy (through any form of communication) wherein the
>> company
>> > > has
>> > > > the right to injunct legal action and an equitable relief for
>> damages.
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Thanks,
>> > > Ewen
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>

Re: MongoDB Kafka Connect driver

Posted by Jay Kreps <ja...@confluent.io>.
Ah, agreed. This approach is actually quite common in change capture,
though. For many use cases getting the final value is actually preferable
to getting intermediates. The exception is usually if you want to do
analytics on something like number of changes.

On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Jay,
>
> You can query after the fact, but you're not necessarily going to get the
> same value back. There could easily be dozens of changes to the document in
> the oplog so the delta you see may not even make sense given the current
> state of the document. Even if you can apply it the delta, you'd still be
> seeing data that is newer than the update. You can of course take this
> shortcut, but it won't give correct results. And if the data has been
> deleted since then, you won't even be able to write the full record... As
> far as I know, the way the op log is exposed won't let you do something
> like pin a query to the state of the db at a specific point in the op log
> and you may be reading from the beginning of the op log, so I don't think
> there's a way to get correct results by just querying the DB for the full
> documents.
>
> Strictly speaking you don't need to get all the data in memory, you just
> need a record of the current set of values somewhere. This is what I was
> describing following those two options -- if you do an initial dump to
> Kafka, you could track only offsets in memory and read back full values as
> needed to apply deltas, but this of course requires random reads into your
> Kafka topic (but may perform fine in practice depending on the workload).
>
> -Ewen
>
> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <ja...@confluent.io> wrote:
>
> > Hey Ewen, how come you need to get it all in memory for approach (1)? I
> > guess the obvious thing to do would just be to query for the record
> > after-image when you get the diff--e.g. just read a batch of changes and
> > multi-get the final values. I don't know how bad the overhead of this
> would
> > be...batching might reduce it a fair amount. The guarantees for this are
> > slightly different than the pure oplog too (you get the current value not
> > every necessarily every intermediate) but that should be okay for most
> > uses.
> >
> > -Jay
> >
> > On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
> ewen@confluent.io>
> > wrote:
> >
> > > Sunny,
> > >
> > > As I said on Twitter, I'm stoked to hear you're working on a Mongo
> > > connector! It struck me as a pretty natural source to tackle since it
> > does
> > > such a nice job of cleanly exposing the op log.
> > >
> > > Regarding the problem of only getting deltas, unfortunately there is
> not
> > a
> > > trivial solution here -- if you want to generate the full updated
> record,
> > > you're going to have to have a way to recover the original document.
> > >
> > > In fact, I'm curious how you were thinking of even bootstrapping. Are
> you
> > > going to do a full dump and then start reading the op log? Is there a
> > good
> > > way to do the dump and figure out the exact location in the op log that
> > the
> > > query generating the dump was initially performed? I know that
> internally
> > > mongo effectively does these two steps, but I'm not sure if the
> necessary
> > > info is exposed via normal queries.
> > >
> > > If you want to reconstitute the data, I can think of a couple of
> options:
> > >
> > > 1. Try to reconstitute inline in the connector. This seems difficult to
> > > make work in practice. At some point you basically have to query for
> the
> > > entire data set to bring it into memory and then the connector is
> > > effectively just applying the deltas to its in memory copy and then
> just
> > > generating one output record containing the full document each time it
> > > applies an update.
> > > 2. Make the connector send just the updates and have a separate stream
> > > processing job perform the reconstitution and send to another topic. In
> > > this case, the first topic should not be compacted, but the second one
> > > could be.
> > >
> > > Unfortunately, without additional hooks into the database, there's not
> > much
> > > you can do besides this pretty heavyweight process. There may be some
> > > tricks you can use to reduce the amount of memory used during the
> process
> > > (e.g. keep a small cache of actual records and for the rest only store
> > > Kafka offsets for the last full value, performing a (possibly
> expensive)
> > > random read as necessary to get the full document value back), but to
> get
> > > full correctness you will need to perform this process.
> > >
> > > In terms of Kafka Connect supporting something like this, I'm not sure
> > how
> > > general it could be made, or that you even want to perform the process
> > > inline with the Kafka Connect job. If it's an issue that repeatedly
> > arises
> > > across a variety of systems, then we should consider how to address it
> > more
> > > generally.
> > >
> > > -Ewen
> > >
> > > On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in>
> wrote:
> > >
> > > >
> > > > Hi ,
> > > >
> > > > We are trying to write a Kafka-connect connector for Mongodb. The
> issue
> > > > is, MongoDB does not provide an entire changed document for update
> > > > operations, It just provides the modified fields.
> > > >
> > > > if Kafka allows custom log compaction then It is possible to
> eventually
> > > > merge an entire document and subsequent update to to create an entire
> > > > record again.
> > > >
> > > > As Ewen pointed out to me on twitter, this is not possible, then What
> > is
> > > > the Kafka-connect way of solving this issue?
> > > >
> > > > @Ewen, Thanks a lot for a really quick answer on twitter.
> > > >
> > > > --
> > > > Thanks and Regards,
> > > >  Sunny
> > > >
> > > > The contents of this e-mail and any attachment(s) are confidential
> and
> > > > intended for the named recipient(s) only. It shall not attach any
> > > liability
> > > > on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates.
> > Any
> > > > form of reproduction, dissemination, copying, disclosure,
> modification,
> > > > distribution and / or publication of this message without the prior
> > > written
> > > > consent of the author of this e-mail is strictly prohibited. If you
> > have
> > > > received this email in error please delete it and notify the sender
> > > > immediately. You are liable to the company (TinyOwl Technology Pvt.
> > > Ltd.) in
> > > > case of any breach in ​
> > > > ​confidentialy (through any form of communication) wherein the
> company
> > > has
> > > > the right to injunct legal action and an equitable relief for
> damages.
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Re: MongoDB Kafka Connect driver

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Jay,

You can query after the fact, but you're not necessarily going to get the
same value back. There could easily be dozens of changes to the document in
the oplog so the delta you see may not even make sense given the current
state of the document. Even if you can apply it the delta, you'd still be
seeing data that is newer than the update. You can of course take this
shortcut, but it won't give correct results. And if the data has been
deleted since then, you won't even be able to write the full record... As
far as I know, the way the op log is exposed won't let you do something
like pin a query to the state of the db at a specific point in the op log
and you may be reading from the beginning of the op log, so I don't think
there's a way to get correct results by just querying the DB for the full
documents.

Strictly speaking you don't need to get all the data in memory, you just
need a record of the current set of values somewhere. This is what I was
describing following those two options -- if you do an initial dump to
Kafka, you could track only offsets in memory and read back full values as
needed to apply deltas, but this of course requires random reads into your
Kafka topic (but may perform fine in practice depending on the workload).

-Ewen

On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <ja...@confluent.io> wrote:

> Hey Ewen, how come you need to get it all in memory for approach (1)? I
> guess the obvious thing to do would just be to query for the record
> after-image when you get the diff--e.g. just read a batch of changes and
> multi-get the final values. I don't know how bad the overhead of this would
> be...batching might reduce it a fair amount. The guarantees for this are
> slightly different than the pure oplog too (you get the current value not
> every necessarily every intermediate) but that should be okay for most
> uses.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > Sunny,
> >
> > As I said on Twitter, I'm stoked to hear you're working on a Mongo
> > connector! It struck me as a pretty natural source to tackle since it
> does
> > such a nice job of cleanly exposing the op log.
> >
> > Regarding the problem of only getting deltas, unfortunately there is not
> a
> > trivial solution here -- if you want to generate the full updated record,
> > you're going to have to have a way to recover the original document.
> >
> > In fact, I'm curious how you were thinking of even bootstrapping. Are you
> > going to do a full dump and then start reading the op log? Is there a
> good
> > way to do the dump and figure out the exact location in the op log that
> the
> > query generating the dump was initially performed? I know that internally
> > mongo effectively does these two steps, but I'm not sure if the necessary
> > info is exposed via normal queries.
> >
> > If you want to reconstitute the data, I can think of a couple of options:
> >
> > 1. Try to reconstitute inline in the connector. This seems difficult to
> > make work in practice. At some point you basically have to query for the
> > entire data set to bring it into memory and then the connector is
> > effectively just applying the deltas to its in memory copy and then just
> > generating one output record containing the full document each time it
> > applies an update.
> > 2. Make the connector send just the updates and have a separate stream
> > processing job perform the reconstitution and send to another topic. In
> > this case, the first topic should not be compacted, but the second one
> > could be.
> >
> > Unfortunately, without additional hooks into the database, there's not
> much
> > you can do besides this pretty heavyweight process. There may be some
> > tricks you can use to reduce the amount of memory used during the process
> > (e.g. keep a small cache of actual records and for the rest only store
> > Kafka offsets for the last full value, performing a (possibly expensive)
> > random read as necessary to get the full document value back), but to get
> > full correctness you will need to perform this process.
> >
> > In terms of Kafka Connect supporting something like this, I'm not sure
> how
> > general it could be made, or that you even want to perform the process
> > inline with the Kafka Connect job. If it's an issue that repeatedly
> arises
> > across a variety of systems, then we should consider how to address it
> more
> > generally.
> >
> > -Ewen
> >
> > On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in> wrote:
> >
> > >
> > > Hi ,
> > >
> > > We are trying to write a Kafka-connect connector for Mongodb. The issue
> > > is, MongoDB does not provide an entire changed document for update
> > > operations, It just provides the modified fields.
> > >
> > > if Kafka allows custom log compaction then It is possible to eventually
> > > merge an entire document and subsequent update to to create an entire
> > > record again.
> > >
> > > As Ewen pointed out to me on twitter, this is not possible, then What
> is
> > > the Kafka-connect way of solving this issue?
> > >
> > > @Ewen, Thanks a lot for a really quick answer on twitter.
> > >
> > > --
> > > Thanks and Regards,
> > >  Sunny
> > >
> > > The contents of this e-mail and any attachment(s) are confidential and
> > > intended for the named recipient(s) only. It shall not attach any
> > liability
> > > on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates.
> Any
> > > form of reproduction, dissemination, copying, disclosure, modification,
> > > distribution and / or publication of this message without the prior
> > written
> > > consent of the author of this e-mail is strictly prohibited. If you
> have
> > > received this email in error please delete it and notify the sender
> > > immediately. You are liable to the company (TinyOwl Technology Pvt.
> > Ltd.) in
> > > case of any breach in ​
> > > ​confidentialy (through any form of communication) wherein the company
> > has
> > > the right to injunct legal action and an equitable relief for damages.
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>



-- 
Thanks,
Ewen

Re: MongoDB Kafka Connect driver

Posted by Jay Kreps <ja...@confluent.io>.
Hey Ewen, how come you need to get it all in memory for approach (1)? I
guess the obvious thing to do would just be to query for the record
after-image when you get the diff--e.g. just read a batch of changes and
multi-get the final values. I don't know how bad the overhead of this would
be...batching might reduce it a fair amount. The guarantees for this are
slightly different than the pure oplog too (you get the current value not
every necessarily every intermediate) but that should be okay for most uses.

-Jay

On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Sunny,
>
> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> connector! It struck me as a pretty natural source to tackle since it does
> such a nice job of cleanly exposing the op log.
>
> Regarding the problem of only getting deltas, unfortunately there is not a
> trivial solution here -- if you want to generate the full updated record,
> you're going to have to have a way to recover the original document.
>
> In fact, I'm curious how you were thinking of even bootstrapping. Are you
> going to do a full dump and then start reading the op log? Is there a good
> way to do the dump and figure out the exact location in the op log that the
> query generating the dump was initially performed? I know that internally
> mongo effectively does these two steps, but I'm not sure if the necessary
> info is exposed via normal queries.
>
> If you want to reconstitute the data, I can think of a couple of options:
>
> 1. Try to reconstitute inline in the connector. This seems difficult to
> make work in practice. At some point you basically have to query for the
> entire data set to bring it into memory and then the connector is
> effectively just applying the deltas to its in memory copy and then just
> generating one output record containing the full document each time it
> applies an update.
> 2. Make the connector send just the updates and have a separate stream
> processing job perform the reconstitution and send to another topic. In
> this case, the first topic should not be compacted, but the second one
> could be.
>
> Unfortunately, without additional hooks into the database, there's not much
> you can do besides this pretty heavyweight process. There may be some
> tricks you can use to reduce the amount of memory used during the process
> (e.g. keep a small cache of actual records and for the rest only store
> Kafka offsets for the last full value, performing a (possibly expensive)
> random read as necessary to get the full document value back), but to get
> full correctness you will need to perform this process.
>
> In terms of Kafka Connect supporting something like this, I'm not sure how
> general it could be made, or that you even want to perform the process
> inline with the Kafka Connect job. If it's an issue that repeatedly arises
> across a variety of systems, then we should consider how to address it more
> generally.
>
> -Ewen
>
> On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in> wrote:
>
> >
> > Hi ,
> >
> > We are trying to write a Kafka-connect connector for Mongodb. The issue
> > is, MongoDB does not provide an entire changed document for update
> > operations, It just provides the modified fields.
> >
> > if Kafka allows custom log compaction then It is possible to eventually
> > merge an entire document and subsequent update to to create an entire
> > record again.
> >
> > As Ewen pointed out to me on twitter, this is not possible, then What is
> > the Kafka-connect way of solving this issue?
> >
> > @Ewen, Thanks a lot for a really quick answer on twitter.
> >
> > --
> > Thanks and Regards,
> >  Sunny
> >
> > The contents of this e-mail and any attachment(s) are confidential and
> > intended for the named recipient(s) only. It shall not attach any
> liability
> > on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any
> > form of reproduction, dissemination, copying, disclosure, modification,
> > distribution and / or publication of this message without the prior
> written
> > consent of the author of this e-mail is strictly prohibited. If you have
> > received this email in error please delete it and notify the sender
> > immediately. You are liable to the company (TinyOwl Technology Pvt.
> Ltd.) in
> > case of any breach in ​
> > ​confidentialy (through any form of communication) wherein the company
> has
> > the right to injunct legal action and an equitable relief for damages.
> >
>
>
>
> --
> Thanks,
> Ewen
>

Re: MongoDB Kafka Connect driver

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Sunny,

As I said on Twitter, I'm stoked to hear you're working on a Mongo
connector! It struck me as a pretty natural source to tackle since it does
such a nice job of cleanly exposing the op log.

Regarding the problem of only getting deltas, unfortunately there is not a
trivial solution here -- if you want to generate the full updated record,
you're going to have to have a way to recover the original document.

In fact, I'm curious how you were thinking of even bootstrapping. Are you
going to do a full dump and then start reading the op log? Is there a good
way to do the dump and figure out the exact location in the op log that the
query generating the dump was initially performed? I know that internally
mongo effectively does these two steps, but I'm not sure if the necessary
info is exposed via normal queries.

If you want to reconstitute the data, I can think of a couple of options:

1. Try to reconstitute inline in the connector. This seems difficult to
make work in practice. At some point you basically have to query for the
entire data set to bring it into memory and then the connector is
effectively just applying the deltas to its in memory copy and then just
generating one output record containing the full document each time it
applies an update.
2. Make the connector send just the updates and have a separate stream
processing job perform the reconstitution and send to another topic. In
this case, the first topic should not be compacted, but the second one
could be.

Unfortunately, without additional hooks into the database, there's not much
you can do besides this pretty heavyweight process. There may be some
tricks you can use to reduce the amount of memory used during the process
(e.g. keep a small cache of actual records and for the rest only store
Kafka offsets for the last full value, performing a (possibly expensive)
random read as necessary to get the full document value back), but to get
full correctness you will need to perform this process.

In terms of Kafka Connect supporting something like this, I'm not sure how
general it could be made, or that you even want to perform the process
inline with the Kafka Connect job. If it's an issue that repeatedly arises
across a variety of systems, then we should consider how to address it more
generally.

-Ewen

On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah <su...@tinyowl.co.in> wrote:

>
> Hi ,
>
> We are trying to write a Kafka-connect connector for Mongodb. The issue
> is, MongoDB does not provide an entire changed document for update
> operations, It just provides the modified fields.
>
> if Kafka allows custom log compaction then It is possible to eventually
> merge an entire document and subsequent update to to create an entire
> record again.
>
> As Ewen pointed out to me on twitter, this is not possible, then What is
> the Kafka-connect way of solving this issue?
>
> @Ewen, Thanks a lot for a really quick answer on twitter.
>
> --
> Thanks and Regards,
>  Sunny
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only. It shall not attach any liability
> on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any
> form of reproduction, dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of the author of this e-mail is strictly prohibited. If you have
> received this email in error please delete it and notify the sender
> immediately. You are liable to the company (TinyOwl Technology Pvt. Ltd.) in
> case of any breach in ​
> ​confidentialy (through any form of communication) wherein the company has
> the right to injunct legal action and an equitable relief for damages.
>



-- 
Thanks,
Ewen