You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shikhar Bhushan <sh...@confluent.io> on 2017/01/05 18:24:13 UTC

Re: Connect: SourceTask poll & commit interaction

I have created https://issues.apache.org/jira/browse/KAFKA-4598 for this.

On Wed, Dec 14, 2016 at 2:58 PM Shikhar Bhushan <sh...@confluent.io>
wrote:

> Hi Mathieu,
>
> I think you are right, there is currently no mutual exclusion between
> `task.commit()` and `task.poll()`. The solution you are thinking of with
> maintaining the committed offset state yourself seems reasonable, though
> inconvenient.
>
> It probably makes sense to add a new parameterized `commit()` method
> carrying the offset map (and possibly deprecate the existing one).
>
> Best,
>
> Shikhar
>
> On Sat, Dec 10, 2016 at 7:57 AM Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
>
> Hi Kafka Users,
>
> I'm looking for a bit of clarification on the documentation for
> implementing a SourceTask.  I'm reading a replication stream from a
> database in my SourceTask, and I'd like to use commit or commitRecord to
> advance the other system's replication stream pointer so that it knows I
> have successfully read & committed the records to Kafka.  This allows the
> other system to discard unneeded transaction logs.
>
> But I'm uncertain how to use either or SourceTask's commit or commitRecord
> correctly.
>
> For commit, the documentation says that it should "Commit the offsets, up
> to the offsets that have been returned by poll().".  When commit() is
> executed, will poll() currently be running on another thread?  I assume it
> must be, because poll should block, and that would imply you can't commit
> the tailing end of some activity.  If commit is invoked while poll is being
> invoked, I'm concerned that I can't reliably determine where to advance my
> replication stream pointer to -- if I store the location at the end of
> poll, commit might be invoked while poll is still returning some records,
> and advance the pointer further than actually guaranteed.
>
> commitRecord on the other hand is invoked per-record.  The documentation
> says "Commit an individual SourceRecord when the callback from the producer
> client is received."  But if I'm producing to N partitions on different
> brokers, I believe that the producer callback is not called in any
> guaranteed order, so I can't advance my replication stream pointer to any
> single record since an older record being delivered to another partition
> may not have been committed.
>
> The only solution I can see so far is to maintain the replication stream
> positions of all the source records that I've returned from poll, and
> advance the replication pointer in commitRecord only when the lowest
> outstanding record is committed.
>
> Is there anything I've misunderstood or misinterpreted?
>
> Thanks,
>
> Mathieu
>
>