You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2016/04/01 20:15:43 UTC

Re: KStream-KTable join with the KTable given a "head start"

Hi Jeff,

Yes, the record timestamps are extracted once they are polled by the
consumer, and are used in multiple places including stream synchronization.

For details you can read this part of the docs:

http://docs.confluent.io/2.1.0-alpha1/streams/architecture.html#flow-control-with-timestamps

Note that this flow control mechanism is not perfect, i.e. if you have a
much later record in one of your stream, it may still be processed out of
order.

Guozhang



On Thu, Mar 31, 2016 at 6:50 AM, Jeff Klukas <jk...@simple.com> wrote:

>
>
> On Thu, Mar 31, 2016 at 9:03 AM, Jeff Klukas <jk...@simple.com> wrote:
>
>>
>>
>> On Wed, Mar 30, 2016 at 5:21 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>>> Hi Jeff,
>>>
>>> If you are using "KStream.leftJoin(KTable..)" then you cannot specify a
>>> join window, and the stream-table join would be implemented just as a table
>>> query upon each arrival of the stream record and a table update upon each
>>> arrival of the table record.
>>>
>>> You can use a join window is you call "KStream.join(KStream...)". Could
>>> you tell me which join operator are you planning to use?
>>>
>>
>> KStream.leftJoin(KTable..) makes sense, but then how to timestamps play
>> into it? The docs and your explanation both make it sound like the join is
>> going to happen as soon as the stream record arrives, so it's unclear to me
>> how modifying timestamps helps in this case.
>>
>
> I read over your reply a few more times, and I can see now that you're
> pointing out that Kafka Streams uses timestamps for more than just
> windowing. And indeed the Confluent docs mention that "timestamps are used
> to control the progress of streams". Does this mean that the application is
> monitoring timestamp progress and is throttling consumers to make better
> progress on the ones that are further behind?
>
> That's exciting, and goes beyond what I had expected Kafka Streams to do
> by default. Stream processing is still pretty new to me, so perhaps this is
> an assumed responsibility of a stream processing framework, but it seems
> like it could be a great selling point.
>
> Is there an obvious class implementation to look at to better understand
> how Kafka Streams is coordinating stream progress within a topology?
>
> I'll plan to post a resolution back to the user mailing list once I feel
> like I have a reasonable handle on how this works.
>
> And thank you so much for your help.
>
>
>>
>>> On Wed, Mar 30, 2016 at 2:15 PM, Jeff Klukas <jk...@simple.com> wrote:
>>>
>>>>
>>>>
>>>> ---------- Forwarded message ----------
>>>>> From: Jeff Klukas <jk...@simple.com>
>>>>> To: users@kafka.apache.org
>>>>> Cc:
>>>>> Date: Wed, 30 Mar 2016 11:14:53 -0400
>>>>> Subject: KStream-KTable join with the KTable given a "head start"
>>>>> I have a KStream that I want to enrich with some values from a lookup
>>>>> table. When a new key enters the KStream, there's likely to be a
>>>>> corresponding entry arriving on the KStream at the same time, so we
>>>>> end up
>>>>> with a race condition. If the KTable record arrives first, then its
>>>>> value
>>>>> is available for joining when the corresponding arrives on the
>>>>> KStream.  If
>>>>> the KStream record arrives first, however, we'll get a null join even
>>>>> if
>>>>> the KTable gets the corresponding record only milliseconds later.
>>>>>
>>>>> I'd like to give the KTable a "head start" of ~10 seconds, so that it
>>>>> gets
>>>>> a chance to get updated before the corresponding records arrive on the
>>>>> KStream. Could I achieve this using one of the existing Windowing
>>>>> strategies?
>>>>>
>>>>>
>>>>> ---------- Forwarded message ----------
>>>>> From: Guozhang Wang <wa...@gmail.com>
>>>>> To: "users@kafka.apache.org" <us...@kafka.apache.org>
>>>>> Cc:
>>>>> Date: Wed, 30 Mar 2016 13:51:03 -0700
>>>>> Subject: Re: KStream-KTable join with the KTable given a "head start"
>>>>> Hi Jeff,
>>>>>
>>>>> This is a common case of stream-table join, in that the joining results
>>>>> depending on the arrival ordering from these two sources.
>>>>>
>>>>> In Kafka Streams you can try to "synchronize" multiple input streams
>>>>> through the "TimestampExtractor" interface, which is used to assign a
>>>>> timestamp to each record polled from Kafka to start the processing. You
>>>>> can, for example, set the timestamps for your KStream within a later
>>>>> time
>>>>> interval and the timestamps for your KTable stream with an earlier time
>>>>> interval, so that the records from table are likely to be processed
>>>>> first.
>>>>> Note that this is an best effort, in that we cannot guarantee global
>>>>> ordering across streams while processing, that if you have a much later
>>>>> record coming from KTable then it will not block earlier records from
>>>>> KStream from being processed first. But we think this mechanism should
>>>>> be
>>>>> sufficient in practice.
>>>>>
>>>>> Let me know if it fits with your scenario, and if not we can talk
>>>>> about how
>>>>> it can be possibly improved.
>>>>>
>>>>
>>>>
>>>> What would I pass in for a window in this case? Or would I not pass in
>>>> a window?
>>>>
>>>> I don't want to put any lower limit on the KTable timestamps I'd be
>>>> willing to join on (the corresponding entry in the KTable could have been
>>>> from weeks ago, or it could be fired right at the same time as the KStream
>>>> event).
>>>>
>>>> Could I use JoinWindows.before() and pass in an arbitrarily long
>>>> interval?
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>


-- 
-- Guozhang