You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/11/25 00:48:20 UTC

Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams

Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 1/4/19 3:42 PM, Richard Yu wrote:
> Hi all,
> 
> Just want to hear some opinions on this KIP from the PMCs. It would be nice
> if we got input from them.
> Don't want to drag this KIP for too long! :)
> 
> Hope we get some input :)
> 
> Thanks,
> Richard
> 
> On Thu, Jan 3, 2019 at 8:26 PM Richard Yu <yo...@gmail.com>
> wrote:
> 
>> Hi Boyang,
>>
>> Interesting article. Although something crossed my mind. When skipping bad
>> records, we couldn't go back to them to process again to guarantee ordering
>> i.e (both exactly-once and at-least-once would not be supported, only
>> at-most-once). Also, in Kafka, when it comes to individually acking every
>> single record, the resulting latency is horrible (from what I heard). We
>> actually discussed something like this in
>> https://issues.apache.org/jira/browse/KAFKA-7432. It might give you some
>> insight since it is a related issue.
>>
>> I hope this helps,
>> Richard
>>
>>
>>
>>
>> On Thu, Jan 3, 2019 at 7:29 PM Boyang Chen <bc...@outlook.com> wrote:
>>
>>> Hey Richard,
>>>
>>> thanks for the explanation. Recently I read an interesting blog post<
>>> https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar
>>> (written long time ago), where they define the concept of individual ack
>>> which means we could skip records and leave certain records remain on the
>>> queue for late processing. This should be something similar to KIP-408
>>> which also shares some motivations for us to invest.
>>>
>>> Boyang
>>>
>>> ________________________________
>>> From: Richard Yu <yo...@gmail.com>
>>> Sent: Friday, January 4, 2019 5:42 AM
>>> To: dev@kafka.apache.org
>>> Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka
>>> Streams
>>>
>>> Hi all,
>>>
>>> Just bumping this KIP. Would be great if we got some discussion.
>>>
>>>
>>> On Sun, Dec 30, 2018 at 5:13 PM Richard Yu <yo...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I made some recent changes to the KIP. It should be more relevant with
>>> the
>>>> issue now (involves Processor API in detail).
>>>> It would be great if you could comment.
>>>>
>>>> Thanks,
>>>> Richard
>>>>
>>>> On Wed, Dec 26, 2018 at 10:01 PM Richard Yu <yohan.richard.yu@gmail.com
>>>>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Just changing the title of the KIP. Discovered it wasn't right.
>>>>> Thats about it. :)
>>>>>
>>>>> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu <yohan.richard.yu@gmail.com
>>>>
>>>>> wrote:
>>>>>
>>>>>> Sorry, just making a correction.
>>>>>>
>>>>>> Even if we are processing records out of order, we will still have to
>>>>>> checkpoint offset ranges.
>>>>>> So it doesn't really change anything even if we are doing in-order
>>>>>> processing.
>>>>>>
>>>>>> Thinking this over, I'm leaning slightly towards maintaining the
>>>>>> ordering guarantee.
>>>>>> Although when implementing this change, there might be some kinks that
>>>>>> we have not thought about which could throw a monkey wrench into the
>>> works.
>>>>>>
>>>>>> But definitely worth trying out,
>>>>>> Richard
>>>>>>
>>>>>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu <
>>> yohan.richard.yu@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Boyang,
>>>>>>>
>>>>>>> I could see where you are going with this. Well, I suppose I should
>>>>>>> have added this to alternatives, but I might as well mention it now.
>>>>>>>
>>>>>>> It had crossed my mind that we consider returning in-order even if
>>>>>>> there are multiple threads processing on the same thread. But for
>>> this to
>>>>>>> happen, we must block for the offsets in-between which have not been
>>>>>>> processed yet. For example, offsets 1-50 are being processed by
>>> thread1,
>>>>>>> while the offsets 51 - 100 are being processed by thread2. We will
>>> have to
>>>>>>> wait for thread1 to finish processing its offsets first before we
>>> return
>>>>>>> the records processed by thread2. So in other words, once thread1 is
>>> done,
>>>>>>> thread2's work up to that point will be returned in one go, but not
>>> before
>>>>>>> that.
>>>>>>>
>>>>>>> I suppose this could work, but the client will have to wait some time
>>>>>>> before the advantages of multithreaded processing can be seen (i.e.
>>> the
>>>>>>> first thread has to finish processing its segment of the records
>>> first
>>>>>>> before any others are returned to guarantee ordering). Another point
>>> I
>>>>>>> would like to make is that the threads are *asynchronous. *So for us
>>>>>>> to know when a thread is done processing a certain segment, we will
>>>>>>> probably have a similar policy to how getMetadataAsync() works (i.e.
>>> have a
>>>>>>> parent thread be notified of when the children threads are done).
>>>>>>> [image: image.png]
>>>>>>> Just pulling this from the KIP. But instead, we would apply this to
>>>>>>> metadata segments instead of just a callback.
>>>>>>> I don't know whether or not the tradeoffs are acceptable to the
>>> client.
>>>>>>> Ordering could be guaranteed, but it would be hard to do. For
>>> example, if
>>>>>>> there was a crash, we might lose track of which offsets numbers and
>>> ranges
>>>>>>> we are processing for each child thread, so somehow we need to find
>>> a way
>>>>>>> to checkpoint those as well (like committing them to a Kafka topic).
>>>>>>>
>>>>>>> Let me know your thoughts on this approach. It would work, but the
>>>>>>> implementation details could be a mess.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Richard
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen <bc...@outlook.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Richard,
>>>>>>>>
>>>>>>>> thanks for the explanation! After some thinking, I do understand
>>> more
>>>>>>>> about this KIP. The motivation was to increase the throughput and
>>> put heavy
>>>>>>>> lifting RPC calls or IO operations to the background. While I feel
>>> the
>>>>>>>> ordering is hard to guarantee for async task, it is better to be
>>>>>>>> configurable for the end users.
>>>>>>>>
>>>>>>>> An example use case I could think of is: for every 500 records
>>>>>>>> processed, we need an RPC to external storage that takes
>>> non-trivial time,
>>>>>>>> and before its finishing all 499 records before it shouldn't be
>>> visible to
>>>>>>>> the end user. In such case, we need to have fine-grained control on
>>> the
>>>>>>>> visibility of downstream consumer so that our async task is
>>> planting a
>>>>>>>> barrier while still make 499 records non-blocking process and send
>>> to
>>>>>>>> downstream. So eventually when the heavy RPC is done, we commit
>>> this record
>>>>>>>> to remove the barrier and make all 500 records available for
>>> downstream. So
>>>>>>>> here we still need to guarantee the ordering within 500 records,
>>> while in
>>>>>>>> the same time consumer semantic has nothing to change.
>>>>>>>>
>>>>>>>> Am I making the point clear here? Just want have more discussion on
>>>>>>>> the ordering guarantee since I feel it wouldn't be a good idea to
>>> break
>>>>>>>> consumer ordering guarantee by default.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Boyang
>>>>>>>>
>>>>>>>> ________________________________
>>>>>>>> From: Richard Yu <yo...@gmail.com>
>>>>>>>> Sent: Saturday, December 22, 2018 9:08 AM
>>>>>>>> To: dev@kafka.apache.org
>>>>>>>> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams
>>>>>>>>
>>>>>>>> Hi Boyang,
>>>>>>>>
>>>>>>>> Thanks for pointing out the possibility of skipping bad records
>>> (never
>>>>>>>> crossed my mind). I suppose we could make it an option for the user
>>> if
>>>>>>>> they
>>>>>>>> could skip a bad record. It was never the intention of this KIP
>>> though
>>>>>>>> on
>>>>>>>> whether or not to do that. I could log a JIRA on such an issue, but
>>> I
>>>>>>>> think
>>>>>>>> this is out of the KIP's scope.
>>>>>>>>
>>>>>>>> As for the ordering guarantees, if you are using the standard Kafka
>>>>>>>> design
>>>>>>>> of one thread per task. Then everything will pretty much remain the
>>>>>>>> same.
>>>>>>>> However, if we are talking about using multiple threads per task
>>>>>>>> (which is
>>>>>>>> something that this KIP proposes), then we should probably expect
>>> the
>>>>>>>> behavior to be somewhat similar to Samza's Async Task as stated in
>>> the
>>>>>>>> JIRA
>>>>>>>> for this KIP (second-last comment).
>>>>>>>> Ordering would no longer be possible (so yeah, basically no
>>> guarantee
>>>>>>>> at
>>>>>>>> all).
>>>>>>>>
>>>>>>>> And how the user handles out-of-order messages is not something I'm
>>>>>>>> well
>>>>>>>> versed in. I guess they can try to put the messages back in order
>>> some
>>>>>>>> time
>>>>>>>> later on. But I honestly don't know what they will do.
>>>>>>>> It would be good if you could give me some insight into this.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Richard
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 21, 2018 at 4:24 PM Boyang Chen <bc...@outlook.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Richard for proposing this feature! We also have
>>> encountered
>>>>>>>> some
>>>>>>>>> similar feature request that we want to define a generic async
>>>>>>>> processing
>>>>>>>>> API<https://issues.apache.org/jira/browse/KAFKA-7566>.
>>>>>>>>>
>>>>>>>>> However I guess the motivation here is that we should skip big
>>>>>>>> records
>>>>>>>>> during normal processing, or let a separate task handle those
>>>>>>>> records who
>>>>>>>>> takes P99 processing time. Since my feeling is that if some edge
>>>>>>>> cases
>>>>>>>>> happen, could we just skip the bad record and continue processing
>>>>>>>> next
>>>>>>>>> record?
>>>>>>>>>
>>>>>>>>> Also I want to understand what kind of ordering guarantee we are
>>>>>>>> gonna
>>>>>>>>> provide with this new API, or there is no ordering guarantee at
>>>>>>>> all?  Could
>>>>>>>>> we discuss any potential issues if consumer needs to process
>>>>>>>> out-of-order
>>>>>>>>> messages?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Boyang
>>>>>>>>> ________________________________
>>>>>>>>> From: Richard Yu <yo...@gmail.com>
>>>>>>>>> Sent: Saturday, December 22, 2018 2:00 AM
>>>>>>>>> To: dev@kafka.apache.org
>>>>>>>>> Subject: KIP-408: Add Asynchronous Processing to Kafka Streams
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> Lately, there has been considerable interest in adding
>>> asynchronous
>>>>>>>>> processing to Kafka Streams.
>>>>>>>>> Here is the KIP for such an addition:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams
>>>>>>>>>
>>>>>>>>> I wish to discuss the best ways to approach this problem.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Richard Yu
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>
>>
>