You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ali Akhtar <al...@gmail.com> on 2017/03/18 17:29:36 UTC

Out of order message processing with Kafka Streams

Is it possible to have Kafka Streams order messages correctly by their
timestamps, even if they arrived out of order?

E.g, say Message A with a timestamp of 5:00 PM and Message B with a
timestamp of 5:15 PM, are sent.

Message B arrives sooner than Message A, due to network issues.

Is it possible to make sure that, across all consumers of Kafka Streams
(even if they are across different servers, but have the same consumer
group), Message A is consumed first, before Message B?

Thanks.

Re: Out of order message processing with Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Hi Ali,

(My use case is, i receive a stream of messages. Messages need to be stored
> and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
> of 30 mins or more since the last message (under a key), a new 'session'
> (bucket) should be started, and future messages should belong to that
> 'session', until the next 30+ min gap).
>
>
This sounds like you might want to use the SessionWindows feature in
KafkaStreams.
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html

Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
Hans,

Which class's javadocs should i look at? From my initial look at the
javadocs and discussion with Michael, it doesn't seem possible.

On Tue, Mar 21, 2017 at 10:44 PM, Hans Jespersen <ha...@confluent.io> wrote:

> Yes, and yes!
>
> -hans
>
>
>
> > On Mar 21, 2017, at 7:45 AM, Ali Akhtar <al...@gmail.com> wrote:
> >
> > That would require
> >
> > - Knowing the current window's id (or some other identifier) to
> > differentiate it from other windows
> >
> > - Being able to process individual messages in a window
> >
> > Are those 2 things possible w/ kafka streams? (java)
> >
> > On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <ha...@confluent.io>
> wrote:
> >
> >> While it's not exactly the same as the window start/stop time you can
> >> store (in the state store) the earliest and latest timestamps of any
> >> messages in each window and use that as a good approximation for the
> window
> >> boundary times.
> >>
> >> -hans
> >>
> >>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <al...@gmail.com> wrote:
> >>>
> >>> Yeah, windowing seems perfect, if only I could find out the current
> >>> window's start time (so I can log the current bucket's start & end
> times)
> >>> and process window messages individually rather than as aggregates.
> >>>
> >>> It doesn't seem like i can get this metadata from ProcessorContext
> >> though,
> >>> from looking over the javadocs
> >>>
> >>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mi...@confluent.io>
> >> wrote:
> >>>>
> >>>> Ali,
> >>>>
> >>>> what you describe is (roughly!) how Kafka Streams implements the
> >> internal
> >>>> state stores to support windowing.
> >>>>
> >>>> Some users have been following a similar approach as you outlined,
> using
> >>>> the Processor API.
> >>>>
> >>>>
> >>>>
> >>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <al...@gmail.com>
> >> wrote:
> >>>>>
> >>>>> It would be helpful to know the 'start' and 'end' of the current
> >>>> metadata,
> >>>>> so if an out of order message arrives late, and is being processed in
> >>>>> foreach(), you'd know which window / bucket it belongs to, and can
> >> handle
> >>>>> it accordingly.
> >>>>>
> >>>>> I'm guessing that's not possible at the moment.
> >>>>>
> >>>>> (My use case is, i receive a stream of messages. Messages need to be
> >>>> stored
> >>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's
> a
> >>>> gap
> >>>>> of 30 mins or more since the last message (under a key), a new
> >> 'session'
> >>>>> (bucket) should be started, and future messages should belong to that
> >>>>> 'session', until the next 30+ min gap).
> >>>>>
> >>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <michael@confluent.io
> >
> >>>>> wrote:
> >>>>>
> >>>>>>> Can windows only be used for aggregations, or can they also be used
> >>>> for
> >>>>>> foreach(),
> >>>>>> and such?
> >>>>>>
> >>>>>> As of today, you can use windows only in aggregations.
> >>>>>>
> >>>>>>> And is it possible to get metadata on the message, such as whether
> or
> >>>>>> not its
> >>>>>> late, its index/position within the other messages, etc?
> >>>>>>
> >>>>>> If you use the Processor API of Kafka Streams, you can have access
> to
> >>>> an
> >>>>>> incoming record's topic, partition, offset, etc. via the so-called
> >>>>>> ProcessorContext (which is updated for every new incoming record):
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>>>> apache/kafka/streams/processor/Processor.html
> >>>>>> - You can get/store a reference to the ProcessorContext from
> >>>>>> `Processor#init()`.
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>>>> apache/kafka/streams/processor/ProcessorContext.html
> >>>>>> - The context can then be used within `Processor#process()` when you
> >>>>>> process a new record.  As I said, the context is updated behind the
> >>>>> scenes
> >>>>>> to match the record that is currently being processed.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Michael
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Can windows only be used for aggregations, or can they also be used
> >>>> for
> >>>>>>> foreach(), and such?
> >>>>>>>
> >>>>>>> And is it possible to get metadata on the message, such as whether
> or
> >>>>> not
> >>>>>>> its late, its index/position within the other messages, etc?
> >>>>>>>
> >>>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <
> michael@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> And since you asked for a pointer, Ali:
> >>>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> >>>> michael@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Late-arriving and out-of-order data is only treated specially for
> >>>>>>>> windowed
> >>>>>>>>> aggregations.
> >>>>>>>>>
> >>>>>>>>> For stateless operations such as `KStream#foreach()` or
> >>>>>>> `KStream#map()`,
> >>>>>>>>> records are processed in the order they arrive (per partition).
> >>>>>>>>>
> >>>>>>>>> -Michael
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> >>>> ali.rac200@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>>> later when message A arrives it will put that message back
> >>>> into
> >>>>>>>>>>> the right temporal context and publish an amended result for
> >>>> the
> >>>>>>>> proper
> >>>>>>>>>>> time/session window as if message B were consumed in the
> >>>>> timestamp
> >>>>>>>> order
> >>>>>>>>>>> before message A.
> >>>>>>>>>>
> >>>>>>>>>> Does this apply to the aggregation Kafka stream methods then,
> >>>> and
> >>>>>> not
> >>>>>>> to
> >>>>>>>>>> e.g foreach?
> >>>>>>>>>>
> >>>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> >>>>> hans@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Yes stream processing and CEP are subtlety different things.
> >>>>>>>>>>>
> >>>>>>>>>>> Kafka Streams helps you write stateful apps and allows that
> >>>>> state
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>>>> preserved on disk (a local State store) as well as distributed
> >>>>> for
> >>>>>>> HA
> >>>>>>>> or
> >>>>>>>>>>> for parallel partitioned processing (via Kafka topic
> >>>> partitions
> >>>>>> and
> >>>>>>>>>>> consumer groups) as well as in memory (as a performance
> >>>>>>> enhancement).
> >>>>>>>>>>>
> >>>>>>>>>>> However a classical CEP engine with a pre-modeled state
> >>>> machine
> >>>>>> and
> >>>>>>>>>>> pattern matching rules is something different from stream
> >>>>>>> processing.
> >>>>>>>>>>>
> >>>>>>>>>>> It is on course possible to build a CEP system on top on Kafka
> >>>>>>> Streams
> >>>>>>>>>> and
> >>>>>>>>>>> get the best of both worlds.
> >>>>>>>>>>>
> >>>>>>>>>>> -hans
> >>>>>>>>>>>
> >>>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> >>>>>>>>>>> sabarish.spk@gmail.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hans
> >>>>>>>>>>>>
> >>>>>>>>>>>> What you state would work for aggregations, but not for
> >>>> state
> >>>>>>>> machines
> >>>>>>>>>>> and
> >>>>>>>>>>>> CEP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards
> >>>>>>>>>>>> Sab
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
> >>>>> hans@confluent.io
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The only way to make sure A is consumed first would be to
> >>>>> delay
> >>>>>>> the
> >>>>>>>>>>>>> consumption of message B for at least 15 minutes which
> >>>> would
> >>>>>> fly
> >>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>> face of the principals of a true streaming platform so the
> >>>>>> short
> >>>>>>>>>> answer
> >>>>>>>>>>> to
> >>>>>>>>>>>>> your question is "no" because that would be batch
> >>>> processing
> >>>>>> not
> >>>>>>>>>> stream
> >>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
> >>>> if
> >>>>>> you
> >>>>>>>> had
> >>>>>>>>>>> some
> >>>>>>>>>>>>> analytics that computes results on a time window or a
> >>>> session
> >>>>>>>> window
> >>>>>>>>>>> then
> >>>>>>>>>>>>> Kafka streams will compute on the stream in real time
> >>>>>> (processing
> >>>>>>>>>>> message
> >>>>>>>>>>>>> B) and then later when message A arrives it will put that
> >>>>>> message
> >>>>>>>>>> back
> >>>>>>>>>>> into
> >>>>>>>>>>>>> the right temporal context and publish an amended result
> >>>> for
> >>>>>> the
> >>>>>>>>>> proper
> >>>>>>>>>>>>> time/session window as if message B were consumed in the
> >>>>>>> timestamp
> >>>>>>>>>> order
> >>>>>>>>>>>>> before message A. The end result of this flow is that you
> >>>>>>>> eventually
> >>>>>>>>>> get
> >>>>>>>>>>>>> the same results you would get in a batch processing system
> >>>>> but
> >>>>>>>> with
> >>>>>>>>>> the
> >>>>>>>>>>>>> added benefit of getting intermediary result at much lower
> >>>>>>> latency.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -hans
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> /**
> >>>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
> >>>> Inc.
> >>>>>>>>>>>>> * hans@confluent.io (650)924-2670
> >>>>>>>>>>>>> */
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> >>>>>>>> ali.rac200@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Is it possible to have Kafka Streams order messages
> >>>>> correctly
> >>>>>> by
> >>>>>>>>>> their
> >>>>>>>>>>>>>> timestamps, even if they arrived out of order?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
> >>>> Message B
> >>>>>>> with
> >>>>>>>> a
> >>>>>>>>>>>>>> timestamp of 5:15 PM, are sent.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Message B arrives sooner than Message A, due to network
> >>>>>> issues.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Is it possible to make sure that, across all consumers of
> >>>>>> Kafka
> >>>>>>>>>> Streams
> >>>>>>>>>>>>>> (even if they are across different servers, but have the
> >>>>> same
> >>>>>>>>>> consumer
> >>>>>>>>>>>>>> group), Message A is consumed first, before Message B?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
>
>

Re: Out of order message processing with Kafka Streams

Posted by Hans Jespersen <ha...@confluent.io>.
Yes, and yes!

-hans



> On Mar 21, 2017, at 7:45 AM, Ali Akhtar <al...@gmail.com> wrote:
> 
> That would require
> 
> - Knowing the current window's id (or some other identifier) to
> differentiate it from other windows
> 
> - Being able to process individual messages in a window
> 
> Are those 2 things possible w/ kafka streams? (java)
> 
> On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <ha...@confluent.io> wrote:
> 
>> While it's not exactly the same as the window start/stop time you can
>> store (in the state store) the earliest and latest timestamps of any
>> messages in each window and use that as a good approximation for the window
>> boundary times.
>> 
>> -hans
>> 
>>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <al...@gmail.com> wrote:
>>> 
>>> Yeah, windowing seems perfect, if only I could find out the current
>>> window's start time (so I can log the current bucket's start & end times)
>>> and process window messages individually rather than as aggregates.
>>> 
>>> It doesn't seem like i can get this metadata from ProcessorContext
>> though,
>>> from looking over the javadocs
>>> 
>>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mi...@confluent.io>
>> wrote:
>>>> 
>>>> Ali,
>>>> 
>>>> what you describe is (roughly!) how Kafka Streams implements the
>> internal
>>>> state stores to support windowing.
>>>> 
>>>> Some users have been following a similar approach as you outlined, using
>>>> the Processor API.
>>>> 
>>>> 
>>>> 
>>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <al...@gmail.com>
>> wrote:
>>>>> 
>>>>> It would be helpful to know the 'start' and 'end' of the current
>>>> metadata,
>>>>> so if an out of order message arrives late, and is being processed in
>>>>> foreach(), you'd know which window / bucket it belongs to, and can
>> handle
>>>>> it accordingly.
>>>>> 
>>>>> I'm guessing that's not possible at the moment.
>>>>> 
>>>>> (My use case is, i receive a stream of messages. Messages need to be
>>>> stored
>>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>>>> gap
>>>>> of 30 mins or more since the last message (under a key), a new
>> 'session'
>>>>> (bucket) should be started, and future messages should belong to that
>>>>> 'session', until the next 30+ min gap).
>>>>> 
>>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mi...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>>> Can windows only be used for aggregations, or can they also be used
>>>> for
>>>>>> foreach(),
>>>>>> and such?
>>>>>> 
>>>>>> As of today, you can use windows only in aggregations.
>>>>>> 
>>>>>>> And is it possible to get metadata on the message, such as whether or
>>>>>> not its
>>>>>> late, its index/position within the other messages, etc?
>>>>>> 
>>>>>> If you use the Processor API of Kafka Streams, you can have access to
>>>> an
>>>>>> incoming record's topic, partition, offset, etc. via the so-called
>>>>>> ProcessorContext (which is updated for every new incoming record):
>>>>>> 
>>>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>>>> apache/kafka/streams/processor/Processor.html
>>>>>> - You can get/store a reference to the ProcessorContext from
>>>>>> `Processor#init()`.
>>>>>> 
>>>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>>>> apache/kafka/streams/processor/ProcessorContext.html
>>>>>> - The context can then be used within `Processor#process()` when you
>>>>>> process a new record.  As I said, the context is updated behind the
>>>>> scenes
>>>>>> to match the record that is currently being processed.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> Michael
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Can windows only be used for aggregations, or can they also be used
>>>> for
>>>>>>> foreach(), and such?
>>>>>>> 
>>>>>>> And is it possible to get metadata on the message, such as whether or
>>>>> not
>>>>>>> its late, its index/position within the other messages, etc?
>>>>>>> 
>>>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> And since you asked for a pointer, Ali:
>>>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>>>> michael@confluent.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Late-arriving and out-of-order data is only treated specially for
>>>>>>>> windowed
>>>>>>>>> aggregations.
>>>>>>>>> 
>>>>>>>>> For stateless operations such as `KStream#foreach()` or
>>>>>>> `KStream#map()`,
>>>>>>>>> records are processed in the order they arrive (per partition).
>>>>>>>>> 
>>>>>>>>> -Michael
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
>>>> ali.rac200@gmail.com
>>>>>> 
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>>> later when message A arrives it will put that message back
>>>> into
>>>>>>>>>>> the right temporal context and publish an amended result for
>>>> the
>>>>>>>> proper
>>>>>>>>>>> time/session window as if message B were consumed in the
>>>>> timestamp
>>>>>>>> order
>>>>>>>>>>> before message A.
>>>>>>>>>> 
>>>>>>>>>> Does this apply to the aggregation Kafka stream methods then,
>>>> and
>>>>>> not
>>>>>>> to
>>>>>>>>>> e.g foreach?
>>>>>>>>>> 
>>>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
>>>>> hans@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Yes stream processing and CEP are subtlety different things.
>>>>>>>>>>> 
>>>>>>>>>>> Kafka Streams helps you write stateful apps and allows that
>>>>> state
>>>>>> to
>>>>>>>> be
>>>>>>>>>>> preserved on disk (a local State store) as well as distributed
>>>>> for
>>>>>>> HA
>>>>>>>> or
>>>>>>>>>>> for parallel partitioned processing (via Kafka topic
>>>> partitions
>>>>>> and
>>>>>>>>>>> consumer groups) as well as in memory (as a performance
>>>>>>> enhancement).
>>>>>>>>>>> 
>>>>>>>>>>> However a classical CEP engine with a pre-modeled state
>>>> machine
>>>>>> and
>>>>>>>>>>> pattern matching rules is something different from stream
>>>>>>> processing.
>>>>>>>>>>> 
>>>>>>>>>>> It is on course possible to build a CEP system on top on Kafka
>>>>>>> Streams
>>>>>>>>>> and
>>>>>>>>>>> get the best of both worlds.
>>>>>>>>>>> 
>>>>>>>>>>> -hans
>>>>>>>>>>> 
>>>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>>>>>>>>>>> sabarish.spk@gmail.com> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hans
>>>>>>>>>>>> 
>>>>>>>>>>>> What you state would work for aggregations, but not for
>>>> state
>>>>>>>> machines
>>>>>>>>>>> and
>>>>>>>>>>>> CEP.
>>>>>>>>>>>> 
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Sab
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
>>>>> hans@confluent.io
>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The only way to make sure A is consumed first would be to
>>>>> delay
>>>>>>> the
>>>>>>>>>>>>> consumption of message B for at least 15 minutes which
>>>> would
>>>>>> fly
>>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>> face of the principals of a true streaming platform so the
>>>>>> short
>>>>>>>>>> answer
>>>>>>>>>>> to
>>>>>>>>>>>>> your question is "no" because that would be batch
>>>> processing
>>>>>> not
>>>>>>>>>> stream
>>>>>>>>>>>>> processing.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
>>>> if
>>>>>> you
>>>>>>>> had
>>>>>>>>>>> some
>>>>>>>>>>>>> analytics that computes results on a time window or a
>>>> session
>>>>>>>> window
>>>>>>>>>>> then
>>>>>>>>>>>>> Kafka streams will compute on the stream in real time
>>>>>> (processing
>>>>>>>>>>> message
>>>>>>>>>>>>> B) and then later when message A arrives it will put that
>>>>>> message
>>>>>>>>>> back
>>>>>>>>>>> into
>>>>>>>>>>>>> the right temporal context and publish an amended result
>>>> for
>>>>>> the
>>>>>>>>>> proper
>>>>>>>>>>>>> time/session window as if message B were consumed in the
>>>>>>> timestamp
>>>>>>>>>> order
>>>>>>>>>>>>> before message A. The end result of this flow is that you
>>>>>>>> eventually
>>>>>>>>>> get
>>>>>>>>>>>>> the same results you would get in a batch processing system
>>>>> but
>>>>>>>> with
>>>>>>>>>> the
>>>>>>>>>>>>> added benefit of getting intermediary result at much lower
>>>>>>> latency.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -hans
>>>>>>>>>>>>> 
>>>>>>>>>>>>> /**
>>>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
>>>> Inc.
>>>>>>>>>>>>> * hans@confluent.io (650)924-2670
>>>>>>>>>>>>> */
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
>>>>>>>> ali.rac200@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Is it possible to have Kafka Streams order messages
>>>>> correctly
>>>>>> by
>>>>>>>>>> their
>>>>>>>>>>>>>> timestamps, even if they arrived out of order?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
>>>> Message B
>>>>>>> with
>>>>>>>> a
>>>>>>>>>>>>>> timestamp of 5:15 PM, are sent.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Message B arrives sooner than Message A, due to network
>>>>>> issues.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Is it possible to make sure that, across all consumers of
>>>>>> Kafka
>>>>>>>>>> Streams
>>>>>>>>>>>>>> (even if they are across different servers, but have the
>>>>> same
>>>>>>>>>> consumer
>>>>>>>>>>>>>> group), Message A is consumed first, before Message B?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 


Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
That would require

- Knowing the current window's id (or some other identifier) to
differentiate it from other windows

- Being able to process individual messages in a window

Are those 2 things possible w/ kafka streams? (java)

On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <ha...@confluent.io> wrote:

> While it's not exactly the same as the window start/stop time you can
> store (in the state store) the earliest and latest timestamps of any
> messages in each window and use that as a good approximation for the window
> boundary times.
>
> -hans
>
> > On Mar 20, 2017, at 1:00 PM, Ali Akhtar <al...@gmail.com> wrote:
> >
> > Yeah, windowing seems perfect, if only I could find out the current
> > window's start time (so I can log the current bucket's start & end times)
> > and process window messages individually rather than as aggregates.
> >
> > It doesn't seem like i can get this metadata from ProcessorContext
> though,
> > from looking over the javadocs
> >
> >> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mi...@confluent.io>
> wrote:
> >>
> >> Ali,
> >>
> >> what you describe is (roughly!) how Kafka Streams implements the
> internal
> >> state stores to support windowing.
> >>
> >> Some users have been following a similar approach as you outlined, using
> >> the Processor API.
> >>
> >>
> >>
> >>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <al...@gmail.com>
> wrote:
> >>>
> >>> It would be helpful to know the 'start' and 'end' of the current
> >> metadata,
> >>> so if an out of order message arrives late, and is being processed in
> >>> foreach(), you'd know which window / bucket it belongs to, and can
> handle
> >>> it accordingly.
> >>>
> >>> I'm guessing that's not possible at the moment.
> >>>
> >>> (My use case is, i receive a stream of messages. Messages need to be
> >> stored
> >>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> >> gap
> >>> of 30 mins or more since the last message (under a key), a new
> 'session'
> >>> (bucket) should be started, and future messages should belong to that
> >>> 'session', until the next 30+ min gap).
> >>>
> >>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mi...@confluent.io>
> >>> wrote:
> >>>
> >>>>> Can windows only be used for aggregations, or can they also be used
> >> for
> >>>> foreach(),
> >>>> and such?
> >>>>
> >>>> As of today, you can use windows only in aggregations.
> >>>>
> >>>>> And is it possible to get metadata on the message, such as whether or
> >>>> not its
> >>>> late, its index/position within the other messages, etc?
> >>>>
> >>>> If you use the Processor API of Kafka Streams, you can have access to
> >> an
> >>>> incoming record's topic, partition, offset, etc. via the so-called
> >>>> ProcessorContext (which is updated for every new incoming record):
> >>>>
> >>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>> apache/kafka/streams/processor/Processor.html
> >>>> - You can get/store a reference to the ProcessorContext from
> >>>> `Processor#init()`.
> >>>>
> >>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>> apache/kafka/streams/processor/ProcessorContext.html
> >>>> - The context can then be used within `Processor#process()` when you
> >>>> process a new record.  As I said, the context is updated behind the
> >>> scenes
> >>>> to match the record that is currently being processed.
> >>>>
> >>>>
> >>>> Best,
> >>>> Michael
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Can windows only be used for aggregations, or can they also be used
> >> for
> >>>>> foreach(), and such?
> >>>>>
> >>>>> And is it possible to get metadata on the message, such as whether or
> >>> not
> >>>>> its late, its index/position within the other messages, etc?
> >>>>>
> >>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> And since you asked for a pointer, Ali:
> >>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> >> michael@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Late-arriving and out-of-order data is only treated specially for
> >>>>>> windowed
> >>>>>>> aggregations.
> >>>>>>>
> >>>>>>> For stateless operations such as `KStream#foreach()` or
> >>>>> `KStream#map()`,
> >>>>>>> records are processed in the order they arrive (per partition).
> >>>>>>>
> >>>>>>> -Michael
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> >> ali.rac200@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>>> later when message A arrives it will put that message back
> >> into
> >>>>>>>>> the right temporal context and publish an amended result for
> >> the
> >>>>>> proper
> >>>>>>>>> time/session window as if message B were consumed in the
> >>> timestamp
> >>>>>> order
> >>>>>>>>> before message A.
> >>>>>>>>
> >>>>>>>> Does this apply to the aggregation Kafka stream methods then,
> >> and
> >>>> not
> >>>>> to
> >>>>>>>> e.g foreach?
> >>>>>>>>
> >>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> >>> hans@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Yes stream processing and CEP are subtlety different things.
> >>>>>>>>>
> >>>>>>>>> Kafka Streams helps you write stateful apps and allows that
> >>> state
> >>>> to
> >>>>>> be
> >>>>>>>>> preserved on disk (a local State store) as well as distributed
> >>> for
> >>>>> HA
> >>>>>> or
> >>>>>>>>> for parallel partitioned processing (via Kafka topic
> >> partitions
> >>>> and
> >>>>>>>>> consumer groups) as well as in memory (as a performance
> >>>>> enhancement).
> >>>>>>>>>
> >>>>>>>>> However a classical CEP engine with a pre-modeled state
> >> machine
> >>>> and
> >>>>>>>>> pattern matching rules is something different from stream
> >>>>> processing.
> >>>>>>>>>
> >>>>>>>>> It is on course possible to build a CEP system on top on Kafka
> >>>>> Streams
> >>>>>>>> and
> >>>>>>>>> get the best of both worlds.
> >>>>>>>>>
> >>>>>>>>> -hans
> >>>>>>>>>
> >>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> >>>>>>>>> sabarish.spk@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hans
> >>>>>>>>>>
> >>>>>>>>>> What you state would work for aggregations, but not for
> >> state
> >>>>>> machines
> >>>>>>>>> and
> >>>>>>>>>> CEP.
> >>>>>>>>>>
> >>>>>>>>>> Regards
> >>>>>>>>>> Sab
> >>>>>>>>>>
> >>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
> >>> hans@confluent.io
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> The only way to make sure A is consumed first would be to
> >>> delay
> >>>>> the
> >>>>>>>>>>> consumption of message B for at least 15 minutes which
> >> would
> >>>> fly
> >>>>> in
> >>>>>>>> the
> >>>>>>>>>>> face of the principals of a true streaming platform so the
> >>>> short
> >>>>>>>> answer
> >>>>>>>>> to
> >>>>>>>>>>> your question is "no" because that would be batch
> >> processing
> >>>> not
> >>>>>>>> stream
> >>>>>>>>>>> processing.
> >>>>>>>>>>>
> >>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
> >> if
> >>>> you
> >>>>>> had
> >>>>>>>>> some
> >>>>>>>>>>> analytics that computes results on a time window or a
> >> session
> >>>>>> window
> >>>>>>>>> then
> >>>>>>>>>>> Kafka streams will compute on the stream in real time
> >>>> (processing
> >>>>>>>>> message
> >>>>>>>>>>> B) and then later when message A arrives it will put that
> >>>> message
> >>>>>>>> back
> >>>>>>>>> into
> >>>>>>>>>>> the right temporal context and publish an amended result
> >> for
> >>>> the
> >>>>>>>> proper
> >>>>>>>>>>> time/session window as if message B were consumed in the
> >>>>> timestamp
> >>>>>>>> order
> >>>>>>>>>>> before message A. The end result of this flow is that you
> >>>>>> eventually
> >>>>>>>> get
> >>>>>>>>>>> the same results you would get in a batch processing system
> >>> but
> >>>>>> with
> >>>>>>>> the
> >>>>>>>>>>> added benefit of getting intermediary result at much lower
> >>>>> latency.
> >>>>>>>>>>>
> >>>>>>>>>>> -hans
> >>>>>>>>>>>
> >>>>>>>>>>> /**
> >>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
> >> Inc.
> >>>>>>>>>>> * hans@confluent.io (650)924-2670
> >>>>>>>>>>> */
> >>>>>>>>>>>
> >>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> >>>>>> ali.rac200@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Is it possible to have Kafka Streams order messages
> >>> correctly
> >>>> by
> >>>>>>>> their
> >>>>>>>>>>>> timestamps, even if they arrived out of order?
> >>>>>>>>>>>>
> >>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
> >> Message B
> >>>>> with
> >>>>>> a
> >>>>>>>>>>>> timestamp of 5:15 PM, are sent.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Message B arrives sooner than Message A, due to network
> >>>> issues.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Is it possible to make sure that, across all consumers of
> >>>> Kafka
> >>>>>>>> Streams
> >>>>>>>>>>>> (even if they are across different servers, but have the
> >>> same
> >>>>>>>> consumer
> >>>>>>>>>>>> group), Message A is consumed first, before Message B?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Re: Out of order message processing with Kafka Streams

Posted by Hans Jespersen <ha...@confluent.io>.
While it's not exactly the same as the window start/stop time you can store (in the state store) the earliest and latest timestamps of any messages in each window and use that as a good approximation for the window boundary times.  

-hans

> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <al...@gmail.com> wrote:
> 
> Yeah, windowing seems perfect, if only I could find out the current
> window's start time (so I can log the current bucket's start & end times)
> and process window messages individually rather than as aggregates.
> 
> It doesn't seem like i can get this metadata from ProcessorContext though,
> from looking over the javadocs
> 
>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mi...@confluent.io> wrote:
>> 
>> Ali,
>> 
>> what you describe is (roughly!) how Kafka Streams implements the internal
>> state stores to support windowing.
>> 
>> Some users have been following a similar approach as you outlined, using
>> the Processor API.
>> 
>> 
>> 
>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <al...@gmail.com> wrote:
>>> 
>>> It would be helpful to know the 'start' and 'end' of the current
>> metadata,
>>> so if an out of order message arrives late, and is being processed in
>>> foreach(), you'd know which window / bucket it belongs to, and can handle
>>> it accordingly.
>>> 
>>> I'm guessing that's not possible at the moment.
>>> 
>>> (My use case is, i receive a stream of messages. Messages need to be
>> stored
>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>> gap
>>> of 30 mins or more since the last message (under a key), a new 'session'
>>> (bucket) should be started, and future messages should belong to that
>>> 'session', until the next 30+ min gap).
>>> 
>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mi...@confluent.io>
>>> wrote:
>>> 
>>>>> Can windows only be used for aggregations, or can they also be used
>> for
>>>> foreach(),
>>>> and such?
>>>> 
>>>> As of today, you can use windows only in aggregations.
>>>> 
>>>>> And is it possible to get metadata on the message, such as whether or
>>>> not its
>>>> late, its index/position within the other messages, etc?
>>>> 
>>>> If you use the Processor API of Kafka Streams, you can have access to
>> an
>>>> incoming record's topic, partition, offset, etc. via the so-called
>>>> ProcessorContext (which is updated for every new incoming record):
>>>> 
>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>> apache/kafka/streams/processor/Processor.html
>>>> - You can get/store a reference to the ProcessorContext from
>>>> `Processor#init()`.
>>>> 
>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>> apache/kafka/streams/processor/ProcessorContext.html
>>>> - The context can then be used within `Processor#process()` when you
>>>> process a new record.  As I said, the context is updated behind the
>>> scenes
>>>> to match the record that is currently being processed.
>>>> 
>>>> 
>>>> Best,
>>>> Michael
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com>
>>> wrote:
>>>> 
>>>>> Can windows only be used for aggregations, or can they also be used
>> for
>>>>> foreach(), and such?
>>>>> 
>>>>> And is it possible to get metadata on the message, such as whether or
>>> not
>>>>> its late, its index/position within the other messages, etc?
>>>>> 
>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>> And since you asked for a pointer, Ali:
>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>> michael@confluent.io>
>>>>>> wrote:
>>>>>> 
>>>>>>> Late-arriving and out-of-order data is only treated specially for
>>>>>> windowed
>>>>>>> aggregations.
>>>>>>> 
>>>>>>> For stateless operations such as `KStream#foreach()` or
>>>>> `KStream#map()`,
>>>>>>> records are processed in the order they arrive (per partition).
>>>>>>> 
>>>>>>> -Michael
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
>> ali.rac200@gmail.com
>>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>>>> later when message A arrives it will put that message back
>> into
>>>>>>>>> the right temporal context and publish an amended result for
>> the
>>>>>> proper
>>>>>>>>> time/session window as if message B were consumed in the
>>> timestamp
>>>>>> order
>>>>>>>>> before message A.
>>>>>>>> 
>>>>>>>> Does this apply to the aggregation Kafka stream methods then,
>> and
>>>> not
>>>>> to
>>>>>>>> e.g foreach?
>>>>>>>> 
>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
>>> hans@confluent.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Yes stream processing and CEP are subtlety different things.
>>>>>>>>> 
>>>>>>>>> Kafka Streams helps you write stateful apps and allows that
>>> state
>>>> to
>>>>>> be
>>>>>>>>> preserved on disk (a local State store) as well as distributed
>>> for
>>>>> HA
>>>>>> or
>>>>>>>>> for parallel partitioned processing (via Kafka topic
>> partitions
>>>> and
>>>>>>>>> consumer groups) as well as in memory (as a performance
>>>>> enhancement).
>>>>>>>>> 
>>>>>>>>> However a classical CEP engine with a pre-modeled state
>> machine
>>>> and
>>>>>>>>> pattern matching rules is something different from stream
>>>>> processing.
>>>>>>>>> 
>>>>>>>>> It is on course possible to build a CEP system on top on Kafka
>>>>> Streams
>>>>>>>> and
>>>>>>>>> get the best of both worlds.
>>>>>>>>> 
>>>>>>>>> -hans
>>>>>>>>> 
>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>>>>>>>>> sabarish.spk@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hans
>>>>>>>>>> 
>>>>>>>>>> What you state would work for aggregations, but not for
>> state
>>>>>> machines
>>>>>>>>> and
>>>>>>>>>> CEP.
>>>>>>>>>> 
>>>>>>>>>> Regards
>>>>>>>>>> Sab
>>>>>>>>>> 
>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
>>> hans@confluent.io
>>>>> 
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> The only way to make sure A is consumed first would be to
>>> delay
>>>>> the
>>>>>>>>>>> consumption of message B for at least 15 minutes which
>> would
>>>> fly
>>>>> in
>>>>>>>> the
>>>>>>>>>>> face of the principals of a true streaming platform so the
>>>> short
>>>>>>>> answer
>>>>>>>>> to
>>>>>>>>>>> your question is "no" because that would be batch
>> processing
>>>> not
>>>>>>>> stream
>>>>>>>>>>> processing.
>>>>>>>>>>> 
>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
>> if
>>>> you
>>>>>> had
>>>>>>>>> some
>>>>>>>>>>> analytics that computes results on a time window or a
>> session
>>>>>> window
>>>>>>>>> then
>>>>>>>>>>> Kafka streams will compute on the stream in real time
>>>> (processing
>>>>>>>>> message
>>>>>>>>>>> B) and then later when message A arrives it will put that
>>>> message
>>>>>>>> back
>>>>>>>>> into
>>>>>>>>>>> the right temporal context and publish an amended result
>> for
>>>> the
>>>>>>>> proper
>>>>>>>>>>> time/session window as if message B were consumed in the
>>>>> timestamp
>>>>>>>> order
>>>>>>>>>>> before message A. The end result of this flow is that you
>>>>>> eventually
>>>>>>>> get
>>>>>>>>>>> the same results you would get in a batch processing system
>>> but
>>>>>> with
>>>>>>>> the
>>>>>>>>>>> added benefit of getting intermediary result at much lower
>>>>> latency.
>>>>>>>>>>> 
>>>>>>>>>>> -hans
>>>>>>>>>>> 
>>>>>>>>>>> /**
>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
>> Inc.
>>>>>>>>>>> * hans@confluent.io (650)924-2670
>>>>>>>>>>> */
>>>>>>>>>>> 
>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
>>>>>> ali.rac200@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Is it possible to have Kafka Streams order messages
>>> correctly
>>>> by
>>>>>>>> their
>>>>>>>>>>>> timestamps, even if they arrived out of order?
>>>>>>>>>>>> 
>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
>> Message B
>>>>> with
>>>>>> a
>>>>>>>>>>>> timestamp of 5:15 PM, are sent.
>>>>>>>>>>>> 
>>>>>>>>>>>> Message B arrives sooner than Message A, due to network
>>>> issues.
>>>>>>>>>>>> 
>>>>>>>>>>>> Is it possible to make sure that, across all consumers of
>>>> Kafka
>>>>>>>> Streams
>>>>>>>>>>>> (even if they are across different servers, but have the
>>> same
>>>>>>>> consumer
>>>>>>>>>>>> group), Message A is consumed first, before Message B?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
Yeah, windowing seems perfect, if only I could find out the current
window's start time (so I can log the current bucket's start & end times)
and process window messages individually rather than as aggregates.

It doesn't seem like i can get this metadata from ProcessorContext though,
from looking over the javadocs

On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mi...@confluent.io> wrote:

> Ali,
>
> what you describe is (roughly!) how Kafka Streams implements the internal
> state stores to support windowing.
>
> Some users have been following a similar approach as you outlined, using
> the Processor API.
>
>
>
> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <al...@gmail.com> wrote:
>
> > It would be helpful to know the 'start' and 'end' of the current
> metadata,
> > so if an out of order message arrives late, and is being processed in
> > foreach(), you'd know which window / bucket it belongs to, and can handle
> > it accordingly.
> >
> > I'm guessing that's not possible at the moment.
> >
> > (My use case is, i receive a stream of messages. Messages need to be
> stored
> > and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> gap
> > of 30 mins or more since the last message (under a key), a new 'session'
> > (bucket) should be started, and future messages should belong to that
> > 'session', until the next 30+ min gap).
> >
> > On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > foreach(),
> > > and such?
> > >
> > > As of today, you can use windows only in aggregations.
> > >
> > > > And is it possible to get metadata on the message, such as whether or
> > > not its
> > > late, its index/position within the other messages, etc?
> > >
> > > If you use the Processor API of Kafka Streams, you can have access to
> an
> > > incoming record's topic, partition, offset, etc. via the so-called
> > > ProcessorContext (which is updated for every new incoming record):
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/Processor.html
> > > - You can get/store a reference to the ProcessorContext from
> > > `Processor#init()`.
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/ProcessorContext.html
> > > - The context can then be used within `Processor#process()` when you
> > > process a new record.  As I said, the context is updated behind the
> > scenes
> > > to match the record that is currently being processed.
> > >
> > >
> > > Best,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com>
> > wrote:
> > >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > > foreach(), and such?
> > > >
> > > > And is it possible to get metadata on the message, such as whether or
> > not
> > > > its late, its index/position within the other messages, etc?
> > > >
> > > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
> > > > wrote:
> > > >
> > > > > And since you asked for a pointer, Ali:
> > > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > > >
> > > > >
> > > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> michael@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Late-arriving and out-of-order data is only treated specially for
> > > > > windowed
> > > > > > aggregations.
> > > > > >
> > > > > > For stateless operations such as `KStream#foreach()` or
> > > > `KStream#map()`,
> > > > > > records are processed in the order they arrive (per partition).
> > > > > >
> > > > > > -Michael
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> ali.rac200@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> > later when message A arrives it will put that message back
> into
> > > > > >> > the right temporal context and publish an amended result for
> the
> > > > > proper
> > > > > >> > time/session window as if message B were consumed in the
> > timestamp
> > > > > order
> > > > > >> > before message A.
> > > > > >>
> > > > > >> Does this apply to the aggregation Kafka stream methods then,
> and
> > > not
> > > > to
> > > > > >> e.g foreach?
> > > > > >>
> > > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> > hans@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Yes stream processing and CEP are subtlety different things.
> > > > > >> >
> > > > > >> > Kafka Streams helps you write stateful apps and allows that
> > state
> > > to
> > > > > be
> > > > > >> > preserved on disk (a local State store) as well as distributed
> > for
> > > > HA
> > > > > or
> > > > > >> > for parallel partitioned processing (via Kafka topic
> partitions
> > > and
> > > > > >> > consumer groups) as well as in memory (as a performance
> > > > enhancement).
> > > > > >> >
> > > > > >> > However a classical CEP engine with a pre-modeled state
> machine
> > > and
> > > > > >> > pattern matching rules is something different from stream
> > > > processing.
> > > > > >> >
> > > > > >> > It is on course possible to build a CEP system on top on Kafka
> > > > Streams
> > > > > >> and
> > > > > >> > get the best of both worlds.
> > > > > >> >
> > > > > >> > -hans
> > > > > >> >
> > > > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > > > > >> > sabarish.spk@gmail.com> wrote:
> > > > > >> > >
> > > > > >> > > Hans
> > > > > >> > >
> > > > > >> > > What you state would work for aggregations, but not for
> state
> > > > > machines
> > > > > >> > and
> > > > > >> > > CEP.
> > > > > >> > >
> > > > > >> > > Regards
> > > > > >> > > Sab
> > > > > >> > >
> > > > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
> > hans@confluent.io
> > > >
> > > > > >> wrote:
> > > > > >> > >>
> > > > > >> > >> The only way to make sure A is consumed first would be to
> > delay
> > > > the
> > > > > >> > >> consumption of message B for at least 15 minutes which
> would
> > > fly
> > > > in
> > > > > >> the
> > > > > >> > >> face of the principals of a true streaming platform so the
> > > short
> > > > > >> answer
> > > > > >> > to
> > > > > >> > >> your question is "no" because that would be batch
> processing
> > > not
> > > > > >> stream
> > > > > >> > >> processing.
> > > > > >> > >>
> > > > > >> > >> However, Kafka Streams does handle late arriving data. So
> if
> > > you
> > > > > had
> > > > > >> > some
> > > > > >> > >> analytics that computes results on a time window or a
> session
> > > > > window
> > > > > >> > then
> > > > > >> > >> Kafka streams will compute on the stream in real time
> > > (processing
> > > > > >> > message
> > > > > >> > >> B) and then later when message A arrives it will put that
> > > message
> > > > > >> back
> > > > > >> > into
> > > > > >> > >> the right temporal context and publish an amended result
> for
> > > the
> > > > > >> proper
> > > > > >> > >> time/session window as if message B were consumed in the
> > > > timestamp
> > > > > >> order
> > > > > >> > >> before message A. The end result of this flow is that you
> > > > > eventually
> > > > > >> get
> > > > > >> > >> the same results you would get in a batch processing system
> > but
> > > > > with
> > > > > >> the
> > > > > >> > >> added benefit of getting intermediary result at much lower
> > > > latency.
> > > > > >> > >>
> > > > > >> > >> -hans
> > > > > >> > >>
> > > > > >> > >> /**
> > > > > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent
> Inc.
> > > > > >> > >> * hans@confluent.io (650)924-2670
> > > > > >> > >> */
> > > > > >> > >>
> > > > > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> > > > > ali.rac200@gmail.com>
> > > > > >> > wrote:
> > > > > >> > >>>
> > > > > >> > >>> Is it possible to have Kafka Streams order messages
> > correctly
> > > by
> > > > > >> their
> > > > > >> > >>> timestamps, even if they arrived out of order?
> > > > > >> > >>>
> > > > > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and
> Message B
> > > > with
> > > > > a
> > > > > >> > >>> timestamp of 5:15 PM, are sent.
> > > > > >> > >>>
> > > > > >> > >>> Message B arrives sooner than Message A, due to network
> > > issues.
> > > > > >> > >>>
> > > > > >> > >>> Is it possible to make sure that, across all consumers of
> > > Kafka
> > > > > >> Streams
> > > > > >> > >>> (even if they are across different servers, but have the
> > same
> > > > > >> consumer
> > > > > >> > >>> group), Message A is consumed first, before Message B?
> > > > > >> > >>>
> > > > > >> > >>> Thanks.
> > > > > >> > >>>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
Ali,

what you describe is (roughly!) how Kafka Streams implements the internal
state stores to support windowing.

Some users have been following a similar approach as you outlined, using
the Processor API.



On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <al...@gmail.com> wrote:

> It would be helpful to know the 'start' and 'end' of the current metadata,
> so if an out of order message arrives late, and is being processed in
> foreach(), you'd know which window / bucket it belongs to, and can handle
> it accordingly.
>
> I'm guessing that's not possible at the moment.
>
> (My use case is, i receive a stream of messages. Messages need to be stored
> and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
> of 30 mins or more since the last message (under a key), a new 'session'
> (bucket) should be started, and future messages should belong to that
> 'session', until the next 30+ min gap).
>
> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > > Can windows only be used for aggregations, or can they also be used for
> > foreach(),
> > and such?
> >
> > As of today, you can use windows only in aggregations.
> >
> > > And is it possible to get metadata on the message, such as whether or
> > not its
> > late, its index/position within the other messages, etc?
> >
> > If you use the Processor API of Kafka Streams, you can have access to an
> > incoming record's topic, partition, offset, etc. via the so-called
> > ProcessorContext (which is updated for every new incoming record):
> >
> > http://docs.confluent.io/current/streams/javadocs/org/
> > apache/kafka/streams/processor/Processor.html
> > - You can get/store a reference to the ProcessorContext from
> > `Processor#init()`.
> >
> > http://docs.confluent.io/current/streams/javadocs/org/
> > apache/kafka/streams/processor/ProcessorContext.html
> > - The context can then be used within `Processor#process()` when you
> > process a new record.  As I said, the context is updated behind the
> scenes
> > to match the record that is currently being processed.
> >
> >
> > Best,
> > Michael
> >
> >
> >
> >
> > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com>
> wrote:
> >
> > > Can windows only be used for aggregations, or can they also be used for
> > > foreach(), and such?
> > >
> > > And is it possible to get metadata on the message, such as whether or
> not
> > > its late, its index/position within the other messages, etc?
> > >
> > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > > > And since you asked for a pointer, Ali:
> > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > >
> > > >
> > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mi...@confluent.io>
> > > > wrote:
> > > >
> > > > > Late-arriving and out-of-order data is only treated specially for
> > > > windowed
> > > > > aggregations.
> > > > >
> > > > > For stateless operations such as `KStream#foreach()` or
> > > `KStream#map()`,
> > > > > records are processed in the order they arrive (per partition).
> > > > >
> > > > > -Michael
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <ali.rac200@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> > later when message A arrives it will put that message back into
> > > > >> > the right temporal context and publish an amended result for the
> > > > proper
> > > > >> > time/session window as if message B were consumed in the
> timestamp
> > > > order
> > > > >> > before message A.
> > > > >>
> > > > >> Does this apply to the aggregation Kafka stream methods then, and
> > not
> > > to
> > > > >> e.g foreach?
> > > > >>
> > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> hans@confluent.io>
> > > > >> wrote:
> > > > >>
> > > > >> > Yes stream processing and CEP are subtlety different things.
> > > > >> >
> > > > >> > Kafka Streams helps you write stateful apps and allows that
> state
> > to
> > > > be
> > > > >> > preserved on disk (a local State store) as well as distributed
> for
> > > HA
> > > > or
> > > > >> > for parallel partitioned processing (via Kafka topic partitions
> > and
> > > > >> > consumer groups) as well as in memory (as a performance
> > > enhancement).
> > > > >> >
> > > > >> > However a classical CEP engine with a pre-modeled state machine
> > and
> > > > >> > pattern matching rules is something different from stream
> > > processing.
> > > > >> >
> > > > >> > It is on course possible to build a CEP system on top on Kafka
> > > Streams
> > > > >> and
> > > > >> > get the best of both worlds.
> > > > >> >
> > > > >> > -hans
> > > > >> >
> > > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > > > >> > sabarish.spk@gmail.com> wrote:
> > > > >> > >
> > > > >> > > Hans
> > > > >> > >
> > > > >> > > What you state would work for aggregations, but not for state
> > > > machines
> > > > >> > and
> > > > >> > > CEP.
> > > > >> > >
> > > > >> > > Regards
> > > > >> > > Sab
> > > > >> > >
> > > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
> hans@confluent.io
> > >
> > > > >> wrote:
> > > > >> > >>
> > > > >> > >> The only way to make sure A is consumed first would be to
> delay
> > > the
> > > > >> > >> consumption of message B for at least 15 minutes which would
> > fly
> > > in
> > > > >> the
> > > > >> > >> face of the principals of a true streaming platform so the
> > short
> > > > >> answer
> > > > >> > to
> > > > >> > >> your question is "no" because that would be batch processing
> > not
> > > > >> stream
> > > > >> > >> processing.
> > > > >> > >>
> > > > >> > >> However, Kafka Streams does handle late arriving data. So if
> > you
> > > > had
> > > > >> > some
> > > > >> > >> analytics that computes results on a time window or a session
> > > > window
> > > > >> > then
> > > > >> > >> Kafka streams will compute on the stream in real time
> > (processing
> > > > >> > message
> > > > >> > >> B) and then later when message A arrives it will put that
> > message
> > > > >> back
> > > > >> > into
> > > > >> > >> the right temporal context and publish an amended result for
> > the
> > > > >> proper
> > > > >> > >> time/session window as if message B were consumed in the
> > > timestamp
> > > > >> order
> > > > >> > >> before message A. The end result of this flow is that you
> > > > eventually
> > > > >> get
> > > > >> > >> the same results you would get in a batch processing system
> but
> > > > with
> > > > >> the
> > > > >> > >> added benefit of getting intermediary result at much lower
> > > latency.
> > > > >> > >>
> > > > >> > >> -hans
> > > > >> > >>
> > > > >> > >> /**
> > > > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > > > >> > >> * hans@confluent.io (650)924-2670
> > > > >> > >> */
> > > > >> > >>
> > > > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> > > > ali.rac200@gmail.com>
> > > > >> > wrote:
> > > > >> > >>>
> > > > >> > >>> Is it possible to have Kafka Streams order messages
> correctly
> > by
> > > > >> their
> > > > >> > >>> timestamps, even if they arrived out of order?
> > > > >> > >>>
> > > > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B
> > > with
> > > > a
> > > > >> > >>> timestamp of 5:15 PM, are sent.
> > > > >> > >>>
> > > > >> > >>> Message B arrives sooner than Message A, due to network
> > issues.
> > > > >> > >>>
> > > > >> > >>> Is it possible to make sure that, across all consumers of
> > Kafka
> > > > >> Streams
> > > > >> > >>> (even if they are across different servers, but have the
> same
> > > > >> consumer
> > > > >> > >>> group), Message A is consumed first, before Message B?
> > > > >> > >>>
> > > > >> > >>> Thanks.
> > > > >> > >>>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
It would be helpful to know the 'start' and 'end' of the current metadata,
so if an out of order message arrives late, and is being processed in
foreach(), you'd know which window / bucket it belongs to, and can handle
it accordingly.

I'm guessing that's not possible at the moment.

(My use case is, i receive a stream of messages. Messages need to be stored
and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap
of 30 mins or more since the last message (under a key), a new 'session'
(bucket) should be started, and future messages should belong to that
'session', until the next 30+ min gap).

On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mi...@confluent.io> wrote:

> > Can windows only be used for aggregations, or can they also be used for
> foreach(),
> and such?
>
> As of today, you can use windows only in aggregations.
>
> > And is it possible to get metadata on the message, such as whether or
> not its
> late, its index/position within the other messages, etc?
>
> If you use the Processor API of Kafka Streams, you can have access to an
> incoming record's topic, partition, offset, etc. via the so-called
> ProcessorContext (which is updated for every new incoming record):
>
> http://docs.confluent.io/current/streams/javadocs/org/
> apache/kafka/streams/processor/Processor.html
> - You can get/store a reference to the ProcessorContext from
> `Processor#init()`.
>
> http://docs.confluent.io/current/streams/javadocs/org/
> apache/kafka/streams/processor/ProcessorContext.html
> - The context can then be used within `Processor#process()` when you
> process a new record.  As I said, the context is updated behind the scenes
> to match the record that is currently being processed.
>
>
> Best,
> Michael
>
>
>
>
> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com> wrote:
>
> > Can windows only be used for aggregations, or can they also be used for
> > foreach(), and such?
> >
> > And is it possible to get metadata on the message, such as whether or not
> > its late, its index/position within the other messages, etc?
> >
> > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > And since you asked for a pointer, Ali:
> > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > > > Late-arriving and out-of-order data is only treated specially for
> > > windowed
> > > > aggregations.
> > > >
> > > > For stateless operations such as `KStream#foreach()` or
> > `KStream#map()`,
> > > > records are processed in the order they arrive (per partition).
> > > >
> > > > -Michael
> > > >
> > > >
> > > >
> > > >
> > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <al...@gmail.com>
> > > wrote:
> > > >
> > > >> > later when message A arrives it will put that message back into
> > > >> > the right temporal context and publish an amended result for the
> > > proper
> > > >> > time/session window as if message B were consumed in the timestamp
> > > order
> > > >> > before message A.
> > > >>
> > > >> Does this apply to the aggregation Kafka stream methods then, and
> not
> > to
> > > >> e.g foreach?
> > > >>
> > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <ha...@confluent.io>
> > > >> wrote:
> > > >>
> > > >> > Yes stream processing and CEP are subtlety different things.
> > > >> >
> > > >> > Kafka Streams helps you write stateful apps and allows that state
> to
> > > be
> > > >> > preserved on disk (a local State store) as well as distributed for
> > HA
> > > or
> > > >> > for parallel partitioned processing (via Kafka topic partitions
> and
> > > >> > consumer groups) as well as in memory (as a performance
> > enhancement).
> > > >> >
> > > >> > However a classical CEP engine with a pre-modeled state machine
> and
> > > >> > pattern matching rules is something different from stream
> > processing.
> > > >> >
> > > >> > It is on course possible to build a CEP system on top on Kafka
> > Streams
> > > >> and
> > > >> > get the best of both worlds.
> > > >> >
> > > >> > -hans
> > > >> >
> > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > > >> > sabarish.spk@gmail.com> wrote:
> > > >> > >
> > > >> > > Hans
> > > >> > >
> > > >> > > What you state would work for aggregations, but not for state
> > > machines
> > > >> > and
> > > >> > > CEP.
> > > >> > >
> > > >> > > Regards
> > > >> > > Sab
> > > >> > >
> > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <hans@confluent.io
> >
> > > >> wrote:
> > > >> > >>
> > > >> > >> The only way to make sure A is consumed first would be to delay
> > the
> > > >> > >> consumption of message B for at least 15 minutes which would
> fly
> > in
> > > >> the
> > > >> > >> face of the principals of a true streaming platform so the
> short
> > > >> answer
> > > >> > to
> > > >> > >> your question is "no" because that would be batch processing
> not
> > > >> stream
> > > >> > >> processing.
> > > >> > >>
> > > >> > >> However, Kafka Streams does handle late arriving data. So if
> you
> > > had
> > > >> > some
> > > >> > >> analytics that computes results on a time window or a session
> > > window
> > > >> > then
> > > >> > >> Kafka streams will compute on the stream in real time
> (processing
> > > >> > message
> > > >> > >> B) and then later when message A arrives it will put that
> message
> > > >> back
> > > >> > into
> > > >> > >> the right temporal context and publish an amended result for
> the
> > > >> proper
> > > >> > >> time/session window as if message B were consumed in the
> > timestamp
> > > >> order
> > > >> > >> before message A. The end result of this flow is that you
> > > eventually
> > > >> get
> > > >> > >> the same results you would get in a batch processing system but
> > > with
> > > >> the
> > > >> > >> added benefit of getting intermediary result at much lower
> > latency.
> > > >> > >>
> > > >> > >> -hans
> > > >> > >>
> > > >> > >> /**
> > > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > > >> > >> * hans@confluent.io (650)924-2670
> > > >> > >> */
> > > >> > >>
> > > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> > > ali.rac200@gmail.com>
> > > >> > wrote:
> > > >> > >>>
> > > >> > >>> Is it possible to have Kafka Streams order messages correctly
> by
> > > >> their
> > > >> > >>> timestamps, even if they arrived out of order?
> > > >> > >>>
> > > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B
> > with
> > > a
> > > >> > >>> timestamp of 5:15 PM, are sent.
> > > >> > >>>
> > > >> > >>> Message B arrives sooner than Message A, due to network
> issues.
> > > >> > >>>
> > > >> > >>> Is it possible to make sure that, across all consumers of
> Kafka
> > > >> Streams
> > > >> > >>> (even if they are across different servers, but have the same
> > > >> consumer
> > > >> > >>> group), Message A is consumed first, before Message B?
> > > >> > >>>
> > > >> > >>> Thanks.
> > > >> > >>>
> > > >> > >>
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > >
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
> Can windows only be used for aggregations, or can they also be used for foreach(),
and such?

As of today, you can use windows only in aggregations.

> And is it possible to get metadata on the message, such as whether or not its
late, its index/position within the other messages, etc?

If you use the Processor API of Kafka Streams, you can have access to an
incoming record's topic, partition, offset, etc. via the so-called
ProcessorContext (which is updated for every new incoming record):

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/Processor.html
- You can get/store a reference to the ProcessorContext from
`Processor#init()`.

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/ProcessorContext.html
- The context can then be used within `Processor#process()` when you
process a new record.  As I said, the context is updated behind the scenes
to match the record that is currently being processed.


Best,
Michael




On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <al...@gmail.com> wrote:

> Can windows only be used for aggregations, or can they also be used for
> foreach(), and such?
>
> And is it possible to get metadata on the message, such as whether or not
> its late, its index/position within the other messages, etc?
>
> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > And since you asked for a pointer, Ali:
> > http://docs.confluent.io/current/streams/concepts.html#windowing
> >
> >
> > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > Late-arriving and out-of-order data is only treated specially for
> > windowed
> > > aggregations.
> > >
> > > For stateless operations such as `KStream#foreach()` or
> `KStream#map()`,
> > > records are processed in the order they arrive (per partition).
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <al...@gmail.com>
> > wrote:
> > >
> > >> > later when message A arrives it will put that message back into
> > >> > the right temporal context and publish an amended result for the
> > proper
> > >> > time/session window as if message B were consumed in the timestamp
> > order
> > >> > before message A.
> > >>
> > >> Does this apply to the aggregation Kafka stream methods then, and not
> to
> > >> e.g foreach?
> > >>
> > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <ha...@confluent.io>
> > >> wrote:
> > >>
> > >> > Yes stream processing and CEP are subtlety different things.
> > >> >
> > >> > Kafka Streams helps you write stateful apps and allows that state to
> > be
> > >> > preserved on disk (a local State store) as well as distributed for
> HA
> > or
> > >> > for parallel partitioned processing (via Kafka topic partitions and
> > >> > consumer groups) as well as in memory (as a performance
> enhancement).
> > >> >
> > >> > However a classical CEP engine with a pre-modeled state machine and
> > >> > pattern matching rules is something different from stream
> processing.
> > >> >
> > >> > It is on course possible to build a CEP system on top on Kafka
> Streams
> > >> and
> > >> > get the best of both worlds.
> > >> >
> > >> > -hans
> > >> >
> > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > >> > sabarish.spk@gmail.com> wrote:
> > >> > >
> > >> > > Hans
> > >> > >
> > >> > > What you state would work for aggregations, but not for state
> > machines
> > >> > and
> > >> > > CEP.
> > >> > >
> > >> > > Regards
> > >> > > Sab
> > >> > >
> > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io>
> > >> wrote:
> > >> > >>
> > >> > >> The only way to make sure A is consumed first would be to delay
> the
> > >> > >> consumption of message B for at least 15 minutes which would fly
> in
> > >> the
> > >> > >> face of the principals of a true streaming platform so the short
> > >> answer
> > >> > to
> > >> > >> your question is "no" because that would be batch processing not
> > >> stream
> > >> > >> processing.
> > >> > >>
> > >> > >> However, Kafka Streams does handle late arriving data. So if you
> > had
> > >> > some
> > >> > >> analytics that computes results on a time window or a session
> > window
> > >> > then
> > >> > >> Kafka streams will compute on the stream in real time (processing
> > >> > message
> > >> > >> B) and then later when message A arrives it will put that message
> > >> back
> > >> > into
> > >> > >> the right temporal context and publish an amended result for the
> > >> proper
> > >> > >> time/session window as if message B were consumed in the
> timestamp
> > >> order
> > >> > >> before message A. The end result of this flow is that you
> > eventually
> > >> get
> > >> > >> the same results you would get in a batch processing system but
> > with
> > >> the
> > >> > >> added benefit of getting intermediary result at much lower
> latency.
> > >> > >>
> > >> > >> -hans
> > >> > >>
> > >> > >> /**
> > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >> > >> * hans@confluent.io (650)924-2670
> > >> > >> */
> > >> > >>
> > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> > ali.rac200@gmail.com>
> > >> > wrote:
> > >> > >>>
> > >> > >>> Is it possible to have Kafka Streams order messages correctly by
> > >> their
> > >> > >>> timestamps, even if they arrived out of order?
> > >> > >>>
> > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B
> with
> > a
> > >> > >>> timestamp of 5:15 PM, are sent.
> > >> > >>>
> > >> > >>> Message B arrives sooner than Message A, due to network issues.
> > >> > >>>
> > >> > >>> Is it possible to make sure that, across all consumers of Kafka
> > >> Streams
> > >> > >>> (even if they are across different servers, but have the same
> > >> consumer
> > >> > >>> group), Message A is consumed first, before Message B?
> > >> > >>>
> > >> > >>> Thanks.
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
Can windows only be used for aggregations, or can they also be used for
foreach(), and such?

And is it possible to get metadata on the message, such as whether or not
its late, its index/position within the other messages, etc?

On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mi...@confluent.io> wrote:

> And since you asked for a pointer, Ali:
> http://docs.confluent.io/current/streams/concepts.html#windowing
>
>
> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Late-arriving and out-of-order data is only treated specially for
> windowed
> > aggregations.
> >
> > For stateless operations such as `KStream#foreach()` or `KStream#map()`,
> > records are processed in the order they arrive (per partition).
> >
> > -Michael
> >
> >
> >
> >
> > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <al...@gmail.com>
> wrote:
> >
> >> > later when message A arrives it will put that message back into
> >> > the right temporal context and publish an amended result for the
> proper
> >> > time/session window as if message B were consumed in the timestamp
> order
> >> > before message A.
> >>
> >> Does this apply to the aggregation Kafka stream methods then, and not to
> >> e.g foreach?
> >>
> >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <ha...@confluent.io>
> >> wrote:
> >>
> >> > Yes stream processing and CEP are subtlety different things.
> >> >
> >> > Kafka Streams helps you write stateful apps and allows that state to
> be
> >> > preserved on disk (a local State store) as well as distributed for HA
> or
> >> > for parallel partitioned processing (via Kafka topic partitions and
> >> > consumer groups) as well as in memory (as a performance enhancement).
> >> >
> >> > However a classical CEP engine with a pre-modeled state machine and
> >> > pattern matching rules is something different from stream processing.
> >> >
> >> > It is on course possible to build a CEP system on top on Kafka Streams
> >> and
> >> > get the best of both worlds.
> >> >
> >> > -hans
> >> >
> >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> >> > sabarish.spk@gmail.com> wrote:
> >> > >
> >> > > Hans
> >> > >
> >> > > What you state would work for aggregations, but not for state
> machines
> >> > and
> >> > > CEP.
> >> > >
> >> > > Regards
> >> > > Sab
> >> > >
> >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io>
> >> wrote:
> >> > >>
> >> > >> The only way to make sure A is consumed first would be to delay the
> >> > >> consumption of message B for at least 15 minutes which would fly in
> >> the
> >> > >> face of the principals of a true streaming platform so the short
> >> answer
> >> > to
> >> > >> your question is "no" because that would be batch processing not
> >> stream
> >> > >> processing.
> >> > >>
> >> > >> However, Kafka Streams does handle late arriving data. So if you
> had
> >> > some
> >> > >> analytics that computes results on a time window or a session
> window
> >> > then
> >> > >> Kafka streams will compute on the stream in real time (processing
> >> > message
> >> > >> B) and then later when message A arrives it will put that message
> >> back
> >> > into
> >> > >> the right temporal context and publish an amended result for the
> >> proper
> >> > >> time/session window as if message B were consumed in the timestamp
> >> order
> >> > >> before message A. The end result of this flow is that you
> eventually
> >> get
> >> > >> the same results you would get in a batch processing system but
> with
> >> the
> >> > >> added benefit of getting intermediary result at much lower latency.
> >> > >>
> >> > >> -hans
> >> > >>
> >> > >> /**
> >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> > >> * hans@confluent.io (650)924-2670
> >> > >> */
> >> > >>
> >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> ali.rac200@gmail.com>
> >> > wrote:
> >> > >>>
> >> > >>> Is it possible to have Kafka Streams order messages correctly by
> >> their
> >> > >>> timestamps, even if they arrived out of order?
> >> > >>>
> >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with
> a
> >> > >>> timestamp of 5:15 PM, are sent.
> >> > >>>
> >> > >>> Message B arrives sooner than Message A, due to network issues.
> >> > >>>
> >> > >>> Is it possible to make sure that, across all consumers of Kafka
> >> Streams
> >> > >>> (even if they are across different servers, but have the same
> >> consumer
> >> > >>> group), Message A is consumed first, before Message B?
> >> > >>>
> >> > >>> Thanks.
> >> > >>>
> >> > >>
> >> >
> >>
> >
> >
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
And since you asked for a pointer, Ali:
http://docs.confluent.io/current/streams/concepts.html#windowing


On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mi...@confluent.io> wrote:

> Late-arriving and out-of-order data is only treated specially for windowed
> aggregations.
>
> For stateless operations such as `KStream#foreach()` or `KStream#map()`,
> records are processed in the order they arrive (per partition).
>
> -Michael
>
>
>
>
> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <al...@gmail.com> wrote:
>
>> > later when message A arrives it will put that message back into
>> > the right temporal context and publish an amended result for the proper
>> > time/session window as if message B were consumed in the timestamp order
>> > before message A.
>>
>> Does this apply to the aggregation Kafka stream methods then, and not to
>> e.g foreach?
>>
>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <ha...@confluent.io>
>> wrote:
>>
>> > Yes stream processing and CEP are subtlety different things.
>> >
>> > Kafka Streams helps you write stateful apps and allows that state to be
>> > preserved on disk (a local State store) as well as distributed for HA or
>> > for parallel partitioned processing (via Kafka topic partitions and
>> > consumer groups) as well as in memory (as a performance enhancement).
>> >
>> > However a classical CEP engine with a pre-modeled state machine and
>> > pattern matching rules is something different from stream processing.
>> >
>> > It is on course possible to build a CEP system on top on Kafka Streams
>> and
>> > get the best of both worlds.
>> >
>> > -hans
>> >
>> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>> > sabarish.spk@gmail.com> wrote:
>> > >
>> > > Hans
>> > >
>> > > What you state would work for aggregations, but not for state machines
>> > and
>> > > CEP.
>> > >
>> > > Regards
>> > > Sab
>> > >
>> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io>
>> wrote:
>> > >>
>> > >> The only way to make sure A is consumed first would be to delay the
>> > >> consumption of message B for at least 15 minutes which would fly in
>> the
>> > >> face of the principals of a true streaming platform so the short
>> answer
>> > to
>> > >> your question is "no" because that would be batch processing not
>> stream
>> > >> processing.
>> > >>
>> > >> However, Kafka Streams does handle late arriving data. So if you had
>> > some
>> > >> analytics that computes results on a time window or a session window
>> > then
>> > >> Kafka streams will compute on the stream in real time (processing
>> > message
>> > >> B) and then later when message A arrives it will put that message
>> back
>> > into
>> > >> the right temporal context and publish an amended result for the
>> proper
>> > >> time/session window as if message B were consumed in the timestamp
>> order
>> > >> before message A. The end result of this flow is that you eventually
>> get
>> > >> the same results you would get in a batch processing system but with
>> the
>> > >> added benefit of getting intermediary result at much lower latency.
>> > >>
>> > >> -hans
>> > >>
>> > >> /**
>> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> > >> * hans@confluent.io (650)924-2670
>> > >> */
>> > >>
>> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com>
>> > wrote:
>> > >>>
>> > >>> Is it possible to have Kafka Streams order messages correctly by
>> their
>> > >>> timestamps, even if they arrived out of order?
>> > >>>
>> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>> > >>> timestamp of 5:15 PM, are sent.
>> > >>>
>> > >>> Message B arrives sooner than Message A, due to network issues.
>> > >>>
>> > >>> Is it possible to make sure that, across all consumers of Kafka
>> Streams
>> > >>> (even if they are across different servers, but have the same
>> consumer
>> > >>> group), Message A is consumed first, before Message B?
>> > >>>
>> > >>> Thanks.
>> > >>>
>> > >>
>> >
>>
>
>
>

Re: Out of order message processing with Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
Late-arriving and out-of-order data is only treated specially for windowed
aggregations.

For stateless operations such as `KStream#foreach()` or `KStream#map()`,
records are processed in the order they arrive (per partition).

-Michael




On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <al...@gmail.com> wrote:

> > later when message A arrives it will put that message back into
> > the right temporal context and publish an amended result for the proper
> > time/session window as if message B were consumed in the timestamp order
> > before message A.
>
> Does this apply to the aggregation Kafka stream methods then, and not to
> e.g foreach?
>
> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <ha...@confluent.io> wrote:
>
> > Yes stream processing and CEP are subtlety different things.
> >
> > Kafka Streams helps you write stateful apps and allows that state to be
> > preserved on disk (a local State store) as well as distributed for HA or
> > for parallel partitioned processing (via Kafka topic partitions and
> > consumer groups) as well as in memory (as a performance enhancement).
> >
> > However a classical CEP engine with a pre-modeled state machine and
> > pattern matching rules is something different from stream processing.
> >
> > It is on course possible to build a CEP system on top on Kafka Streams
> and
> > get the best of both worlds.
> >
> > -hans
> >
> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > sabarish.spk@gmail.com> wrote:
> > >
> > > Hans
> > >
> > > What you state would work for aggregations, but not for state machines
> > and
> > > CEP.
> > >
> > > Regards
> > > Sab
> > >
> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io>
> wrote:
> > >>
> > >> The only way to make sure A is consumed first would be to delay the
> > >> consumption of message B for at least 15 minutes which would fly in
> the
> > >> face of the principals of a true streaming platform so the short
> answer
> > to
> > >> your question is "no" because that would be batch processing not
> stream
> > >> processing.
> > >>
> > >> However, Kafka Streams does handle late arriving data. So if you had
> > some
> > >> analytics that computes results on a time window or a session window
> > then
> > >> Kafka streams will compute on the stream in real time (processing
> > message
> > >> B) and then later when message A arrives it will put that message back
> > into
> > >> the right temporal context and publish an amended result for the
> proper
> > >> time/session window as if message B were consumed in the timestamp
> order
> > >> before message A. The end result of this flow is that you eventually
> get
> > >> the same results you would get in a batch processing system but with
> the
> > >> added benefit of getting intermediary result at much lower latency.
> > >>
> > >> -hans
> > >>
> > >> /**
> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >> * hans@confluent.io (650)924-2670
> > >> */
> > >>
> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com>
> > wrote:
> > >>>
> > >>> Is it possible to have Kafka Streams order messages correctly by
> their
> > >>> timestamps, even if they arrived out of order?
> > >>>
> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> > >>> timestamp of 5:15 PM, are sent.
> > >>>
> > >>> Message B arrives sooner than Message A, due to network issues.
> > >>>
> > >>> Is it possible to make sure that, across all consumers of Kafka
> Streams
> > >>> (even if they are across different servers, but have the same
> consumer
> > >>> group), Message A is consumed first, before Message B?
> > >>>
> > >>> Thanks.
> > >>>
> > >>
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
> later when message A arrives it will put that message back into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A.

Does this apply to the aggregation Kafka stream methods then, and not to
e.g foreach?

On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <ha...@confluent.io> wrote:

> Yes stream processing and CEP are subtlety different things.
>
> Kafka Streams helps you write stateful apps and allows that state to be
> preserved on disk (a local State store) as well as distributed for HA or
> for parallel partitioned processing (via Kafka topic partitions and
> consumer groups) as well as in memory (as a performance enhancement).
>
> However a classical CEP engine with a pre-modeled state machine and
> pattern matching rules is something different from stream processing.
>
> It is on course possible to build a CEP system on top on Kafka Streams and
> get the best of both worlds.
>
> -hans
>
> > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> sabarish.spk@gmail.com> wrote:
> >
> > Hans
> >
> > What you state would work for aggregations, but not for state machines
> and
> > CEP.
> >
> > Regards
> > Sab
> >
> >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io> wrote:
> >>
> >> The only way to make sure A is consumed first would be to delay the
> >> consumption of message B for at least 15 minutes which would fly in the
> >> face of the principals of a true streaming platform so the short answer
> to
> >> your question is "no" because that would be batch processing not stream
> >> processing.
> >>
> >> However, Kafka Streams does handle late arriving data. So if you had
> some
> >> analytics that computes results on a time window or a session window
> then
> >> Kafka streams will compute on the stream in real time (processing
> message
> >> B) and then later when message A arrives it will put that message back
> into
> >> the right temporal context and publish an amended result for the proper
> >> time/session window as if message B were consumed in the timestamp order
> >> before message A. The end result of this flow is that you eventually get
> >> the same results you would get in a batch processing system but with the
> >> added benefit of getting intermediary result at much lower latency.
> >>
> >> -hans
> >>
> >> /**
> >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> * hans@confluent.io (650)924-2670
> >> */
> >>
> >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com>
> wrote:
> >>>
> >>> Is it possible to have Kafka Streams order messages correctly by their
> >>> timestamps, even if they arrived out of order?
> >>>
> >>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> >>> timestamp of 5:15 PM, are sent.
> >>>
> >>> Message B arrives sooner than Message A, due to network issues.
> >>>
> >>> Is it possible to make sure that, across all consumers of Kafka Streams
> >>> (even if they are across different servers, but have the same consumer
> >>> group), Message A is consumed first, before Message B?
> >>>
> >>> Thanks.
> >>>
> >>
>

Re: Out of order message processing with Kafka Streams

Posted by Hans Jespersen <ha...@confluent.io>.
Yes stream processing and CEP are subtlety different things. 

Kafka Streams helps you write stateful apps and allows that state to be preserved on disk (a local State store) as well as distributed for HA or for parallel partitioned processing (via Kafka topic partitions and consumer groups) as well as in memory (as a performance enhancement).

However a classical CEP engine with a pre-modeled state machine and pattern matching rules is something different from stream processing.

It is on course possible to build a CEP system on top on Kafka Streams and get the best of both worlds.

-hans

> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <sa...@gmail.com> wrote:
> 
> Hans
> 
> What you state would work for aggregations, but not for state machines and
> CEP.
> 
> Regards
> Sab
> 
>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io> wrote:
>> 
>> The only way to make sure A is consumed first would be to delay the
>> consumption of message B for at least 15 minutes which would fly in the
>> face of the principals of a true streaming platform so the short answer to
>> your question is "no" because that would be batch processing not stream
>> processing.
>> 
>> However, Kafka Streams does handle late arriving data. So if you had some
>> analytics that computes results on a time window or a session window then
>> Kafka streams will compute on the stream in real time (processing message
>> B) and then later when message A arrives it will put that message back into
>> the right temporal context and publish an amended result for the proper
>> time/session window as if message B were consumed in the timestamp order
>> before message A. The end result of this flow is that you eventually get
>> the same results you would get in a batch processing system but with the
>> added benefit of getting intermediary result at much lower latency.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * hans@confluent.io (650)924-2670
>> */
>> 
>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com> wrote:
>>> 
>>> Is it possible to have Kafka Streams order messages correctly by their
>>> timestamps, even if they arrived out of order?
>>> 
>>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>>> timestamp of 5:15 PM, are sent.
>>> 
>>> Message B arrives sooner than Message A, due to network issues.
>>> 
>>> Is it possible to make sure that, across all consumers of Kafka Streams
>>> (even if they are across different servers, but have the same consumer
>>> group), Message A is consumed first, before Message B?
>>> 
>>> Thanks.
>>> 
>> 

Re: Out of order message processing with Kafka Streams

Posted by Sabarish Sasidharan <sa...@gmail.com>.
Hans

What you state would work for aggregations, but not for state machines and
CEP.

Regards
Sab

On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <ha...@confluent.io> wrote:

> The only way to make sure A is consumed first would be to delay the
> consumption of message B for at least 15 minutes which would fly in the
> face of the principals of a true streaming platform so the short answer to
> your question is "no" because that would be batch processing not stream
> processing.
>
> However, Kafka Streams does handle late arriving data. So if you had some
> analytics that computes results on a time window or a session window then
> Kafka streams will compute on the stream in real time (processing message
> B) and then later when message A arrives it will put that message back into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A. The end result of this flow is that you eventually get
> the same results you would get in a batch processing system but with the
> added benefit of getting intermediary result at much lower latency.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * hans@confluent.io (650)924-2670
>  */
>
> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com> wrote:
>
> > Is it possible to have Kafka Streams order messages correctly by their
> > timestamps, even if they arrived out of order?
> >
> > E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> > timestamp of 5:15 PM, are sent.
> >
> > Message B arrives sooner than Message A, due to network issues.
> >
> > Is it possible to make sure that, across all consumers of Kafka Streams
> > (even if they are across different servers, but have the same consumer
> > group), Message A is consumed first, before Message B?
> >
> > Thanks.
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Ali Akhtar <al...@gmail.com>.
> However, Kafka Streams does handle late arriving data. So if you had some
> analytics that computes results on a time window or a session window then
> Kafka streams will compute on the stream in real time (processing message
> B) and then later when message A arrives it will put that message back
into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A.

Can you link me to the javadocs or documentation for where I can read more
into how this is done / how to use this? Thanks.



On Sat, Mar 18, 2017 at 11:11 PM, Hans Jespersen <ha...@confluent.io> wrote:

> sorry I mixed up Message A and B wrt the to question but the answer is
> still valid.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * hans@confluent.io (650)924-2670
>  */
>
> On Sat, Mar 18, 2017 at 11:07 AM, Hans Jespersen <ha...@confluent.io>
> wrote:
>
> > The only way to make sure A is consumed first would be to delay the
> > consumption of message B for at least 15 minutes which would fly in the
> > face of the principals of a true streaming platform so the short answer
> to
> > your question is "no" because that would be batch processing not stream
> > processing.
> >
> > However, Kafka Streams does handle late arriving data. So if you had some
> > analytics that computes results on a time window or a session window then
> > Kafka streams will compute on the stream in real time (processing message
> > B) and then later when message A arrives it will put that message back
> into
> > the right temporal context and publish an amended result for the proper
> > time/session window as if message B were consumed in the timestamp order
> > before message A. The end result of this flow is that you eventually get
> > the same results you would get in a batch processing system but with the
> > added benefit of getting intermediary result at much lower latency.
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * hans@confluent.io (650)924-2670 <(650)%20924-2670>
> >  */
> >
> > On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com>
> wrote:
> >
> >> Is it possible to have Kafka Streams order messages correctly by their
> >> timestamps, even if they arrived out of order?
> >>
> >> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> >> timestamp of 5:15 PM, are sent.
> >>
> >> Message B arrives sooner than Message A, due to network issues.
> >>
> >> Is it possible to make sure that, across all consumers of Kafka Streams
> >> (even if they are across different servers, but have the same consumer
> >> group), Message A is consumed first, before Message B?
> >>
> >> Thanks.
> >>
> >
> >
>

Re: Out of order message processing with Kafka Streams

Posted by Hans Jespersen <ha...@confluent.io>.
sorry I mixed up Message A and B wrt the to question but the answer is
still valid.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * hans@confluent.io (650)924-2670
 */

On Sat, Mar 18, 2017 at 11:07 AM, Hans Jespersen <ha...@confluent.io> wrote:

> The only way to make sure A is consumed first would be to delay the
> consumption of message B for at least 15 minutes which would fly in the
> face of the principals of a true streaming platform so the short answer to
> your question is "no" because that would be batch processing not stream
> processing.
>
> However, Kafka Streams does handle late arriving data. So if you had some
> analytics that computes results on a time window or a session window then
> Kafka streams will compute on the stream in real time (processing message
> B) and then later when message A arrives it will put that message back into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A. The end result of this flow is that you eventually get
> the same results you would get in a batch processing system but with the
> added benefit of getting intermediary result at much lower latency.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * hans@confluent.io (650)924-2670 <(650)%20924-2670>
>  */
>
> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com> wrote:
>
>> Is it possible to have Kafka Streams order messages correctly by their
>> timestamps, even if they arrived out of order?
>>
>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>> timestamp of 5:15 PM, are sent.
>>
>> Message B arrives sooner than Message A, due to network issues.
>>
>> Is it possible to make sure that, across all consumers of Kafka Streams
>> (even if they are across different servers, but have the same consumer
>> group), Message A is consumed first, before Message B?
>>
>> Thanks.
>>
>
>

Re: Out of order message processing with Kafka Streams

Posted by Hans Jespersen <ha...@confluent.io>.
The only way to make sure A is consumed first would be to delay the
consumption of message B for at least 15 minutes which would fly in the
face of the principals of a true streaming platform so the short answer to
your question is "no" because that would be batch processing not stream
processing.

However, Kafka Streams does handle late arriving data. So if you had some
analytics that computes results on a time window or a session window then
Kafka streams will compute on the stream in real time (processing message
B) and then later when message A arrives it will put that message back into
the right temporal context and publish an amended result for the proper
time/session window as if message B were consumed in the timestamp order
before message A. The end result of this flow is that you eventually get
the same results you would get in a batch processing system but with the
added benefit of getting intermediary result at much lower latency.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * hans@confluent.io (650)924-2670
 */

On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <al...@gmail.com> wrote:

> Is it possible to have Kafka Streams order messages correctly by their
> timestamps, even if they arrived out of order?
>
> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> timestamp of 5:15 PM, are sent.
>
> Message B arrives sooner than Message A, due to network issues.
>
> Is it possible to make sure that, across all consumers of Kafka Streams
> (even if they are across different servers, but have the same consumer
> group), Message A is consumed first, before Message B?
>
> Thanks.
>