You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Per Steffensen <pe...@gmail.com> on 2018/10/10 13:49:58 UTC

[DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Please help make the proposed changes in KIP-381 become reality. Please 
comment.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback

JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

PR: https://github.com/apache/kafka/pull/3872

Thanks!



Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Per Steffensen <pe...@gmail.com>.
On 18/10/2018 16.46, Ryanne Dolan wrote:
> Per Steffenson, getting sequence numbers correct is definitely difficult,
> but this is not Connect's fault. I'd like to see Connect implement
> exactly-once from end-to-end, but that requires coordination between
> sources and sinks along the lines that you allude to, using sequence
> numbers and transactions and whatnot.
>
> The problem with commit() is knowing when it's okay to delete the files in
> your example. I don't believe that issue has anything to do with avoiding
> dupes or assigning unique sequence numbers. I believe it is safe to delete
> a file if you know it has been delivered successfully, which the present
> API exposes.
Ok, I believe you put too much into my example. It was just what I was 
able to come up with, that was so simple that it could be explained 
fairly easily, and where it would be important that you know for which 
records offsets has been flushed. The example may not have been good 
enough to fulfill its purpose of showing that knowing exactly for which 
records offsets have been flushed is important.

You might argue that you, as a source-connector developer, do not need 
to know about when offsets are flushed at all. I will argue that in many 
cases you do, and that you may need to know exactly which records have 
had their offsets flushed. In that case the current "commit" method is 
useless. You do not know which records had their offsets flushed
* It is not necessarily all the records returned from "poll", at the 
point of "commit" called. Even though the current JavaDoc claims so
* It is not necessarily all the records for which "commitRecord" has 
been called. It may be more records, or it may be less records
Bottom line is that current "commit" cannot be used for much. You may 
argue that it should just be removed, but I definitely would not like to 
see that. I use "commit" i several of my source-connectors (pretending 
that it works), and could not live without it.
As I see it the offsets are kinda your accounting "information about how 
to proceed from where you came to". Kafka Connect offers to help me keep 
track of that accounting in alignment with the outgoing data related to 
that accounting. Of course I could just deal with all that myself, but 
then a lot of the niceness of Kafka Connect would be gone, and I might 
as well just do everything myself.
>
> That said, I'm not opposed to your proposed callbacks, and I agree that
> commit() and commitRecord() are poorly named. I just don't believe the
> present API is incorrect.
I definitely do. Currently there is a callback "commit" that lies in its 
JavaDoc, and that essentially cannot be used for anything, except for 
making you confused. You know nothing about the state when it is called.

But as long as you do not oppose the proposed solution, we probably 
should not spend too much time arguing back and forth about opinions.
>
> Ryanne
Regards, and thanks for participating in the discussion
Per Steffensen

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Ryanne Dolan <ry...@gmail.com>.
Per Steffenson, getting sequence numbers correct is definitely difficult,
but this is not Connect's fault. I'd like to see Connect implement
exactly-once from end-to-end, but that requires coordination between
sources and sinks along the lines that you allude to, using sequence
numbers and transactions and whatnot.

The problem with commit() is knowing when it's okay to delete the files in
your example. I don't believe that issue has anything to do with avoiding
dupes or assigning unique sequence numbers. I believe it is safe to delete
a file if you know it has been delivered successfully, which the present
API exposes.

That said, I'm not opposed to your proposed callbacks, and I agree that
commit() and commitRecord() are poorly named. I just don't believe the
present API is incorrect.

Ryanne



On Thu, Oct 18, 2018 at 7:04 AM Per Steffensen <pe...@gmail.com> wrote:

> On 17/10/2018 18.17, Ryanne Dolan wrote:
>
> > this does not guarantee that the
> > offsets of R have been written/flushed at the next commit() call
>
> True, but does it matter? So long as you can guarantee the records are
> delivered to the downstream Kafka cluster, it shouldn't matter if they have
> been committed or not.
>
> The worst that can happen is that the worker gets bounced and asks for the
> same records a second time. Even if those records have since been dropped
> from the upstream data source, it doesn't matter cuz you know they were
> previously delivered successfully.
>
> You are kinda arguing that offsets are not usable at all. I think they
> are. Below I will explain a fairly simple source-connector, and how it
> would be mislead by the way source-connector-framework currently works, and
> how my fix would help it not be. The source-connector is picked out of blue
> air, but not too far from what I have had to deal with in real life
>
> Lets assume I write a fairly simple source-connector, that picks up data
> from files in a given folder. For simplicity lets just assume that each
> file fits in a Kafka-message. My source-connector just sorts the files by
> timestamp and sends out the data in the files, oldest file first. It is
> possible that the receiving side of the data my source-connector sends out,
> will get the same data twice, for one of the following reasons
> * There were actually two input-files that contained exactly the same data
> (in that case the receiving side should handle it twice)
> * The data from that same file may be sent twice in two Kafka-messages,
> due to global atomicy being impossible (in that case the receiving side
> should only handle the data once)
> I order to allow the receiving side to know, when two consecutive messages
> are essentially the same, so that it will know only to handle one of them,
> I introduce a simple sequence-numbering system in my source-connector. I
> simply write a sequence-number in the Kafka-messages, and I use
> Kafka-connect offsets to keep track of the next sequence-number to be used,
> so that I can pick up with the correct sequence-number in case of a
> crash/restart. If there is no offsets when the source-connector starts
> (first start) it will just start with sequence-number 1.
>
> *Assume the following files are in the input-folder:*
> * 2018-01-01_10_00_00-<GUID1>.data
> * 2018-01-01_10_00_00-<GUID2>.data
> * 2018-01-01_10_00_01-<GUID3>.data
> * 2018-01-01_10_00_02-<GUID4>.data
> …
>
> *Now this sequence of events are possible*
> * mySourceConnector.poll() —> [
>   R1 = record({seq: 1, data=<data from
> 2018-01-01_10_00_00-<GUID1>.data>},{ nextSeq=2 }},
>   R2 = record({seq: 2, data=<data from
> 2018-01-01_10_00_00-<GUID2>.data>},{ nextSeq=3 }}
> ]
> * data of R1 was sent and acknowledged
> * mySourceConnector.commitRecord(R1)
> * data of R2 was sent and acknowledged
> * mySourceConnector.commitRecord(R2)
> * offsets-committer kicks in around here and picks up the offsets from R1
> and R2, resulting in the merged offsets to written and flushed to be {
> nextSeq=3 }
> * mySourceConnector.poll() —> [
>   R3 = record({seq: 3, data=<data from
> 2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
> ]
> * data of R3 was sent and acknowledged
> * mySourceConnector.commitRecord(R3)
> * offsets-committer finishes writing and flushing offsets { nextSeq=3 }
> * mySourceConnector.commit()
>
> In mySourceConnector.commit() implementation I believe that the data and
> offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, and
> therefore I delete the following files
> * 2018-01-01_10_00_00-<GUID1>.data
> * 2018-01-01_10_00_00-<GUID2>.data
> * 2018-01-01_10_00_01-<GUID3>.data
> But the truth is that data for R1, R2 and R3 has been sent with
> sequence-number 1, 2 and 3 respectively, but the flushed offsets says {
> nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
> If the system crashes here, upon restart I will get { nextSeq=3 }, but
> file containing the data supposed to get sequence-number 3 has already been
> deleted. Therefore I will end up with this next poll
> * poll() —> [
>   R4 = record({seq: 3, data=<data from 2018-01-01_10_00_02-<GUID4>.data},{
> nextSeq=4 }}
> ]
> If my system had worked I should have ended up with this next poll
> * poll() —> [
>   R4 = record({seq: 4, data=<data from 2018-01-01_10_00_02-<GUID4>.data},{
> nextSeq=5 }}
> ]
> The receiving side of my data will get two messages containing the same
> sequence-number 3. It will therefore incorrectly ignore the second message.
> Even if it double check by looking at the actual data of the two message,
> and If the content of <data from 2018-01-01_10_00_01-<GUID3>.data and <data
> from 2018-01-01_10_00_02-<GUID4>.data was actually identical, it has no way
> of figuring out to do the right thing (actually handle both messages)
>
> *With my fix to the problem*, the call to commit() would have been
> mySourceConnector.commit([R1, R2])
> I would know only to delete the following files
> * 2018-01-01_10_00_00-<GUID1>.data
> * 2018-01-01_10_00_00-<GUID2>.data
> And after crash/restart I would end up sending the correct next message
> mySourceConnector.poll() —> [
>   R3 = record({seq: 3, data=<data from
> 2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
> ]
>
>

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Per Steffensen <pe...@gmail.com>.
On 17/10/2018 18.17, Ryanne Dolan wrote:
> > this does not guarantee that the
> > offsets of R have been written/flushed at the next commit() call
>
> True, but does it matter? So long as you can guarantee the records are 
> delivered to the downstream Kafka cluster, it shouldn't matter if they 
> have been committed or not.
>
> The worst that can happen is that the worker gets bounced and asks for 
> the same records a second time. Even if those records have since been 
> dropped from the upstream data source, it doesn't matter cuz you know 
> they were previously delivered successfully.
>
You are kinda arguing that offsets are not usable at all. I think they 
are. Below I will explain a fairly simple source-connector, and how it 
would be mislead by the way source-connector-framework currently works, 
and how my fix would help it not be. The source-connector is picked out 
of blue air, but not too far from what I have had to deal with in real life

Lets assume I write a fairly simple source-connector, that picks up data 
from files in a given folder. For simplicity lets just assume that each 
file fits in a Kafka-message. My source-connector just sorts the files 
by timestamp and sends out the data in the files, oldest file first. It 
is possible that the receiving side of the data my source-connector 
sends out, will get the same data twice, for one of the following reasons
* There were actually two input-files that contained exactly the same 
data (in that case the receiving side should handle it twice)
* The data from that same file may be sent twice in two Kafka-messages, 
due to global atomicy being impossible (in that case the receiving side 
should only handle the data once)
I order to allow the receiving side to know, when two consecutive 
messages are essentially the same, so that it will know only to handle 
one of them, I introduce a simple sequence-numbering system in my 
source-connector. I simply write a sequence-number in the 
Kafka-messages, and I use Kafka-connect offsets to keep track of the 
next sequence-number to be used, so that I can pick up with the correct 
sequence-number in case of a crash/restart. If there is no offsets when 
the source-connector starts (first start) it will just start with 
sequence-number 1.

*Assume the following files are in the input-folder:*
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
* 2018-01-01_10_00_01-<GUID3>.data
* 2018-01-01_10_00_02-<GUID4>.data
…

*Now this sequence of events are possible*
* mySourceConnector.poll() —> [
   R1 = record({seq: 1, data=<data from 
2018-01-01_10_00_00-<GUID1>.data>},{ nextSeq=2 }},
   R2 = record({seq: 2, data=<data from 
2018-01-01_10_00_00-<GUID2>.data>},{ nextSeq=3 }}
]
* data of R1 was sent and acknowledged
* mySourceConnector.commitRecord(R1)
* data of R2 was sent and acknowledged
* mySourceConnector.commitRecord(R2)
* offsets-committer kicks in around here and picks up the offsets from 
R1 and R2, resulting in the merged offsets to written and flushed to be 
{ nextSeq=3 }
* mySourceConnector.poll() —> [
   R3 = record({seq: 3, data=<data from 
2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
]
* data of R3 was sent and acknowledged
* mySourceConnector.commitRecord(R3)
* offsets-committer finishes writing and flushing offsets { nextSeq=3 }
* mySourceConnector.commit()

In mySourceConnector.commit() implementation I believe that the data and 
offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, 
and therefore I delete the following files
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
* 2018-01-01_10_00_01-<GUID3>.data
But the truth is that data for R1, R2 and R3 has been sent with 
sequence-number 1, 2 and 3 respectively, but the flushed offsets says { 
nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
If the system crashes here, upon restart I will get { nextSeq=3 }, but 
file containing the data supposed to get sequence-number 3 has already 
been deleted. Therefore I will end up with this next poll
* poll() —> [
   R4 = record({seq: 3, data=<data from 
2018-01-01_10_00_02-<GUID4>.data},{ nextSeq=4 }}
]
If my system had worked I should have ended up with this next poll
* poll() —> [
   R4 = record({seq: 4, data=<data from 
2018-01-01_10_00_02-<GUID4>.data},{ nextSeq=5 }}
]
The receiving side of my data will get two messages containing the same 
sequence-number 3. It will therefore incorrectly ignore the second 
message. Even if it double check by looking at the actual data of the 
two message, and If the content of <data from 
2018-01-01_10_00_01-<GUID3>.data and <data from 
2018-01-01_10_00_02-<GUID4>.data was actually identical, it has no way 
of figuring out to do the right thing (actually handle both messages)

*With my fix to the problem*, the call to commit() would have been
mySourceConnector.commit([R1, R2])
I would know only to delete the following files
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
And after crash/restart I would end up sending the correct next message
mySourceConnector.poll() —> [
   R3 = record({seq: 3, data=<data from 
2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
]


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Ryanne Dolan <ry...@gmail.com>.
> this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call

True, but does it matter? So long as you can guarantee the records are
delivered to the downstream Kafka cluster, it shouldn't matter if they have
been committed or not.

The worst that can happen is that the worker gets bounced and asks for the
same records a second time. Even if those records have since been dropped
from the upstream data source, it doesn't matter cuz you know they were
previously delivered successfully.

I do agree that commit() and commitRecord() are poorly named, but I don't
think there's anything fundamentally missing from the API.

Ryanne

On Wed, Oct 17, 2018 at 10:24 AM Per Steffensen <pe...@gmail.com> wrote:

> On 17/10/2018 16.43, Ryanne Dolan wrote:
> > I see, thanks.
> > On the other hand, the commitRecord() callback provides the functionality
> > you require in this case. In commitRecord() your SourceTask can track the
> > offsets of records that have been ack'd by the producer client, and then
> in
> > commit() you can be sure that those offsets have been flushed.
> That is the trick I am currently using - more or less.
> But unfortunately it does not work 100% either. It is possible that
> commitRecord() is called with a record R, and then commit() is called
> after that, without the offsets of R having been written/flushed. The
> call to commitRecord() means that the "actual data" of R has been
> send/acknowledged, but unfortunately this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call
>
>

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Per Steffensen <pe...@gmail.com>.
On 17/10/2018 16.43, Ryanne Dolan wrote:
> I see, thanks.
> On the other hand, the commitRecord() callback provides the functionality
> you require in this case. In commitRecord() your SourceTask can track the
> offsets of records that have been ack'd by the producer client, and then in
> commit() you can be sure that those offsets have been flushed.
That is the trick I am currently using - more or less.
But unfortunately it does not work 100% either. It is possible that 
commitRecord() is called with a record R, and then commit() is called 
after that, without the offsets of R having been written/flushed. The 
call to commitRecord() means that the "actual data" of R has been 
send/acknowledged, but unfortunately this does not guarantee that the 
offsets of R have been written/flushed at the next commit() call


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Ryanne Dolan <ry...@gmail.com>.
> There is no guarantee that the data in R has been sent/acknowledged
> to/by Kafka, nor that the offsets in R has been flushed to offset-store
(it
> is likely, though).

I see, thanks.

On the other hand, the commitRecord() callback provides the functionality
you require in this case. In commitRecord() your SourceTask can track the
offsets of records that have been ack'd by the producer client, and then in
commit() you can be sure that those offsets have been flushed.

I'm not opposed, however, to baking this into the framework and exposing a
new callback. Otherwise every correct SourceConnector would need to
implement similar logic.

Ryanne

On Wed, Oct 17, 2018 at 7:25 AM Per Steffensen <pe...@gmail.com> wrote:

> Lets use X for the the point in time where commit() is called. Lets use
> Rs(X) for the recorders returned by poll()s at time X.
> At time X, it is not necessarily true that all records in Rs(X) have been
> sent to Kafka (and acknowledged) and had their offsets flushed to
> offset-store.
>
> Example
> * Time X-1: poll() is called and one records R is returned
> * Time X: commit() is called. There is no guarantee that the data in R has
> been sent/acknowledged to/by Kafka, nor that the offsets in R has been
> flushed to offset-store (it is likely, though).
>
> Due to synchronization necessary, it is probably hard to make that
> guarantee, without reducing throughput significantly. But it is feasible to
> make the change that commit() is given (via argument) a list/collection of
> the records for which it is a guarantee. Thats what my current fix does
> (see PR).
>
>
> On 16/10/2018 19.33, Ryanne Dolan wrote:
>
> Steff,
>
> > Guess people have used it, assuming that all records that have been
> polled > at the time of callback to "commit", have also had their offsets
> committed. > But that is not true.
>
> (excerpt from KIP)
>
> The documentation for SourceTask.commit() reads:
>
> > Commit the offsets, up to the offsets that have been returned by {@link
> #poll()}. This > method should block until the commit is complete.
>
> I'm confused by these seemingly contradictory statements. My assumption
> (as you say) is that all records returned by poll() will have been
> committed before commit() is invoked by the framework. Is that not the case?
>
> Ryanne
>
> On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen <pe...@gmail.com> wrote:
>
>> Please help make the proposed changes in KIP-381 become reality. Please
>> comment.
>>
>> KIP:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
>>
>> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
>>
>> PR: https://github.com/apache/kafka/pull/3872
>>
>> Thanks!
>>
>>
>>
>

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Per Steffensen <pe...@gmail.com>.
Lets use X for the the point in time where commit() is called. Lets use 
Rs(X) for the recorders returned by poll()s at time X.
At time X, it is not necessarily true that all records in Rs(X) have 
been sent to Kafka (and acknowledged) and had their offsets flushed to 
offset-store.

Example
* Time X-1: poll() is called and one records R is returned
* Time X: commit() is called. There is no guarantee that the data in R 
has been sent/acknowledged to/by Kafka, nor that the offsets in R has 
been flushed to offset-store (it is likely, though).

Due to synchronization necessary, it is probably hard to make that 
guarantee, without reducing throughput significantly. But it is feasible 
to make the change that commit() is given (via argument) a 
list/collection of the records for which it is a guarantee. Thats what 
my current fix does (see PR).

On 16/10/2018 19.33, Ryanne Dolan wrote:
> Steff,
>
> > Guess people have used it, assuming that all records that have been 
> polled > at the time of callback to "commit", have also had their 
> offsets committed. > But that is not true.
>
> (excerpt from KIP)
>
> The documentation for SourceTask.commit() reads:
>
> > Commit the offsets, up to the offsets that have been returned by 
> {@link #poll()}. This > method should block until the commit is complete.
>
> I'm confused by these seemingly contradictory statements. My 
> assumption (as you say) is that all records returned by poll() will 
> have been committed before commit() is invoked by the framework. Is 
> that not the case?
>
> Ryanne
>
> On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen <persteff@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Please help make the proposed changes in KIP-381 become reality.
>     Please
>     comment.
>
>     KIP:
>     https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
>
>     JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
>
>     PR: https://github.com/apache/kafka/pull/3872
>
>     Thanks!
>
>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Ryanne Dolan <ry...@gmail.com>.
Steff,

> Guess people have used it, assuming that all records that have been
polled > at the time of callback to "commit", have also had their offsets
committed. > But that is not true.

(excerpt from KIP)

The documentation for SourceTask.commit() reads:

> Commit the offsets, up to the offsets that have been returned by {@link
#poll()}. This > method should block until the commit is complete.

I'm confused by these seemingly contradictory statements. My assumption (as
you say) is that all records returned by poll() will have been committed
before commit() is invoked by the framework. Is that not the case?

Ryanne

On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen <pe...@gmail.com> wrote:

> Please help make the proposed changes in KIP-381 become reality. Please
> comment.
>
> KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
>
> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
>
> PR: https://github.com/apache/kafka/pull/3872
>
> Thanks!
>
>
>

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Konstantine Karantasis <ko...@confluent.io>.
Indeed, implying that the flushing and acknowledgement of records happens
in order reveals an implementation detail that is not required by the
interface. Strictly speaking if that was required then you'd only need a
single record as an argument to offstesFlushedAndAcked to indicate up to
which record from the list of polled records the task managed to flush and
ack. But this wouldn't be elegant nor is certain that the task is aware at
the point when offstesFlushedAndAcked is called what was the order of the
records that it returned with poll.

I don't feel strong about it and I acknowledge the symmetry with poll. I
made this comment since we are passing a complete list of flushed records
and retaining the order seemed to restrict future implementations. But
probably makes things simpler for source connectors too.

Regarding the text sections I was mainly referring to importing information
from the jira discussion to the KIP. And my intention is the same: that
people will understand the improvement and the motivation just by reading
the KIP, without having to go over the code changes or the jira comments.
I'll let you to it and let's see where we are with implementation later in
the current KIP cycle. I find it a useful improvement, I hope it makes it
in.

-Konstantine



On Wed, Oct 17, 2018 at 5:55 AM Per Steffensen <pe...@gmail.com> wrote:

> The fix will definitely "facilitate" the source-connectors I have
> written. It will make them work 100% correctly. Today they dont.
>
> Fine for me to change "Acknowledged" to "Acked" in the method-naming.
>
> Not sure I would like to give a Collection instead of a List as the
> argument to offstesFlushedAndAck(nowledg)ed. poll() returns a List
> (ordered records), the records are handled in that order and I would
> like to hand the records back in that order as well. Handling back a
> Collection may indicate that order does not matter. Besides that it is
> likely to help the implementation of offstesFlushedAndAck(nowledg)ed
> that you get records back in order.
>
> Regarding adding stuff to the "rejected approaches" and "motivation"
> sections of the KIP, I am not sure I will get the time anytime soon.
> Please feel free to help adding this to the KIP. This way we also have
> at least two persons who really understands what this is about. Some
> times you only really understand what something is about, when you are
> forced to write about it (at least that is my excuse ).
>
> Regards, Per Steffensen
>
> On 16/10/2018 05.57, Konstantine Karantasis wrote:
> > This is a significant improvement to the semantics of source connectors.
> > I'm expecting that it will facilitate source connector implementations
> and
> > even enrich the application uses cases that we see. I only have a few
> minor
> > suggestions at the moment.
> >
> > I believe that Acked is a common abbreviation for Acknowledged and that
> we
> > could use it in this context. And this suggestion is coming from a big
> > proponent of complete words in variables and method names. Thus, feel
> free
> > to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'.
> Since
> > this is a public interface, I'd also make the implementation specific
> > comment that a Collection<SourceRecord> might be more versatile than
> > List<SourceRecord> as argument in offsetsFlushedAndAcknowledged.
> >
> > The rejected approaches section could use some of the material in the
> > original jira ticket, which is pretty insightful in order to understand
> how
> > we arrived to this KIP. For example, I think it'd be useful to state
> > briefly why the 'commit' method is not getting removed completely but
> it's
> > substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would
> > be to include in the motivation section some info related to why and how
> a
> > source system could use these method calls to safely recycle data that
> have
> > been surely imported to Kafka. I see this type of use cases having an
> > increased importance as Kafka is used more and more as the source of
> truth
> > and persistence layer for an increasing number of applications.
> >
> > These suggestions, although they are not strictly required in order to
> move
> > forward with this improvement, I believe can help a lot to understand the
> > context of this proposed changed, without having to read the complete
> > history in the jira ticket.
> >
> > Thanks for the KIP Per!
> >
> > -Konstantine
> >
> >
> > On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen <pe...@gmail.com>
> wrote:
> >
> >> Please help make the proposed changes in KIP-381 become reality. Please
> >> comment.
> >>
> >> KIP:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
> >>
> >> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
> >>
> >> PR: https://github.com/apache/kafka/pull/3872
> >>
> >> Thanks!
> >>
> >>
> >>
>
>

Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Per Steffensen <pe...@gmail.com>.
The fix will definitely "facilitate" the source-connectors I have 
written. It will make them work 100% correctly. Today they dont.

Fine for me to change "Acknowledged" to "Acked" in the method-naming.

Not sure I would like to give a Collection instead of a List as the 
argument to offstesFlushedAndAck(nowledg)ed. poll() returns a List 
(ordered records), the records are handled in that order and I would 
like to hand the records back in that order as well. Handling back a 
Collection may indicate that order does not matter. Besides that it is 
likely to help the implementation of offstesFlushedAndAck(nowledg)ed 
that you get records back in order.

Regarding adding stuff to the "rejected approaches" and "motivation" 
sections of the KIP, I am not sure I will get the time anytime soon. 
Please feel free to help adding this to the KIP. This way we also have 
at least two persons who really understands what this is about. Some 
times you only really understand what something is about, when you are 
forced to write about it (at least that is my excuse ).

Regards, Per Steffensen

On 16/10/2018 05.57, Konstantine Karantasis wrote:
> This is a significant improvement to the semantics of source connectors.
> I'm expecting that it will facilitate source connector implementations and
> even enrich the application uses cases that we see. I only have a few minor
> suggestions at the moment.
>
> I believe that Acked is a common abbreviation for Acknowledged and that we
> could use it in this context. And this suggestion is coming from a big
> proponent of complete words in variables and method names. Thus, feel free
> to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'. Since
> this is a public interface, I'd also make the implementation specific
> comment that a Collection<SourceRecord> might be more versatile than
> List<SourceRecord> as argument in offsetsFlushedAndAcknowledged.
>
> The rejected approaches section could use some of the material in the
> original jira ticket, which is pretty insightful in order to understand how
> we arrived to this KIP. For example, I think it'd be useful to state
> briefly why the 'commit' method is not getting removed completely but it's
> substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would
> be to include in the motivation section some info related to why and how a
> source system could use these method calls to safely recycle data that have
> been surely imported to Kafka. I see this type of use cases having an
> increased importance as Kafka is used more and more as the source of truth
> and persistence layer for an increasing number of applications.
>
> These suggestions, although they are not strictly required in order to move
> forward with this improvement, I believe can help a lot to understand the
> context of this proposed changed, without having to read the complete
> history in the jira ticket.
>
> Thanks for the KIP Per!
>
> -Konstantine
>
>
> On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen <pe...@gmail.com> wrote:
>
>> Please help make the proposed changes in KIP-381 become reality. Please
>> comment.
>>
>> KIP:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
>>
>> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
>>
>> PR: https://github.com/apache/kafka/pull/3872
>>
>> Thanks!
>>
>>
>>


Re: [DISCUSS] KIP-381 Connect: Tell about records that had their offsets flushed in callback

Posted by Konstantine Karantasis <ko...@confluent.io>.
This is a significant improvement to the semantics of source connectors.
I'm expecting that it will facilitate source connector implementations and
even enrich the application uses cases that we see. I only have a few minor
suggestions at the moment.

I believe that Acked is a common abbreviation for Acknowledged and that we
could use it in this context. And this suggestion is coming from a big
proponent of complete words in variables and method names. Thus, feel free
to consider 'offsetsFlushedAndAcked' as well as 'recordSentAndAcked'. Since
this is a public interface, I'd also make the implementation specific
comment that a Collection<SourceRecord> might be more versatile than
List<SourceRecord> as argument in offsetsFlushedAndAcknowledged.

The rejected approaches section could use some of the material in the
original jira ticket, which is pretty insightful in order to understand how
we arrived to this KIP. For example, I think it'd be useful to state
briefly why the 'commit' method is not getting removed completely but it's
substituted with 'offsetsFlushedAndAcked'. Also useful I believe it would
be to include in the motivation section some info related to why and how a
source system could use these method calls to safely recycle data that have
been surely imported to Kafka. I see this type of use cases having an
increased importance as Kafka is used more and more as the source of truth
and persistence layer for an increasing number of applications.

These suggestions, although they are not strictly required in order to move
forward with this improvement, I believe can help a lot to understand the
context of this proposed changed, without having to read the complete
history in the jira ticket.

Thanks for the KIP Per!

-Konstantine


On Wed, Oct 10, 2018 at 6:50 AM Per Steffensen <pe...@gmail.com> wrote:

> Please help make the proposed changes in KIP-381 become reality. Please
> comment.
>
> KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
>
> JIRA: https://issues.apache.org/jira/browse/KAFKA-5716
>
> PR: https://github.com/apache/kafka/pull/3872
>
> Thanks!
>
>
>