You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ali Akhtar <al...@gmail.com> on 2016/10/12 03:56:12 UTC

Understanding out of order message processing w/ Streaming

Heya,

Say I'm building a live auction site, with different products. Different
users will bid on different products. And each time they do, I want to
update the product's price, so it should always have the latest price in
place.

Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on the same
product 100 ms later.

The second bid arrives first and the price is updated to $5. Then the first
bid arrives. I want the price to not be updated in this case, as this bid
is older than the one I've already processed.

Here's my understanding of how I can achieve this with Kafka Streaming - is
my understanding correct?

- I have a topic for receiving bids. The topic has N partitions, and I have
N replicas of my application which hooks up w/ Kafka Streaming, up and
running.

- I assume each replica of my app will listen to a different partition of
the topic.

- A user makes a bid on product A.

- This is pushed to the topic with the key bid_a

- Another user makes a bid. This is also pushed with the same key (bid_a)

- The 2nd bid arrives first, and gets processed. Then the first (older) bid
arrives.

- Because I'm using a KTable, the timestamp of the messages is extracted,
and I'm not shown the older bid because I've already processed the later
bid. The older bid is ignored.

- All bids on product A go to the same topic partition, and hence the same
replica of my app, because they all have the key bid_a.

- Because of this, the replica already knows which timestamps it has
processed, and is able to ignore the older messages.

Is the above understandning correct?

Also, what will happen if bid 2 arrived and got processed, and then the
particular replica crashed, and was restarted. The restarted replica won't
have any memory of which timestamps it has previously processed.

So if bid 2 got processed, replica crashed and restarted, and then bid 1
arrived, what would happen in that case?

Thanks.

Re: Understanding out of order message processing w/ Streaming

Posted by Ali Akhtar <al...@gmail.com>.
I am probably being too ocd anyway. It will almost never happen that
messages from another vm in the same network on ec2 arrive out of order.
Right?

On 13 Oct 2016 8:47 pm, "Ali Akhtar" <al...@gmail.com> wrote:

> Makes sense. Thanks
>
> On 13 Oct 2016 12:42 pm, "Michael Noll" <mi...@confluent.io> wrote:
>
>> > But if they arrive out of order, I have to detect / process that myself
>> in
>> > the processor logic.
>>
>> Yes -- if your processing logic depends on the specific ordering of
>> messages (which is the case for you), then you must manually implement
>> this
>> ordering-specific logic at the moment.
>>
>> Other use cases may not need to do that and "just work" even with
>> out-of-order data.  If, for example, you are counting objects or are
>> computing the sum of numbers, then you do not need to anything special.
>>
>>
>>
>>
>>
>> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <al...@gmail.com>
>> wrote:
>>
>> > Thanks Matthias.
>> >
>> > So, if I'm understanding this right, Kafka will not discard which
>> messages
>> > which arrive out of order.
>> >
>> > What it will do is show messages in the order in which they arrive.
>> >
>> > But if they arrive out of order, I have to detect / process that myself
>> in
>> > the processor logic.
>> >
>> > Is that correct?
>> >
>> > Thanks.
>> >
>> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <
>> matthias@confluent.io>
>> > wrote:
>> >
>> > > -----BEGIN PGP SIGNED MESSAGE-----
>> > > Hash: SHA512
>> > >
>> > > Last question first: A KTable is basically in finite window over the
>> > > whole stream providing a single result (that gets updated when new
>> > > data arrives). If you use windows, you cut the overall stream into
>> > > finite subsets and get a result per window. Thus, I guess you do not
>> > > need windows (if I understood you use case correctly).
>> > >
>> > > However, current state of Kafka Streams DSL, you will not be able to
>> > > use KTable (directly -- see suggestion to fix this below) because is
>> > > does (currently) not allow to access the timestamp of the current
>> > > record (thus, you can not know if a record is late or not). You will
>> > > need to use Processor API which allows you to access the current
>> > > records timestamp via the Context object given in init()
>> > >
>> > > Your reasoning about partitions and Streams instances is correct.
>> > > However, the following two are not
>> > >
>> > > > - Because I'm using a KTable, the timestamp of the messages is
>> > > > extracted, and I'm not shown the older bid because I've already
>> > > > processed the later bid. The older bid is ignored.
>> > >
>> > > and
>> > >
>> > > > - Because of this, the replica already knows which timestamps it
>> > > > has processed, and is able to ignore the older messages.
>> > >
>> > > Late arriving records are not dropped but processes regularly. Thus,
>> > > your KTable aggregate function will be called for the late arriving
>> > > record, too (but as described about, you have currently no way to know
>> > > it is a later record).
>> > >
>> > >
>> > > Last but not least, you last statement is a valid concern:
>> > >
>> > > > Also, what will happen if bid 2 arrived and got processed, and then
>> > > > the particular replica crashed, and was restarted. The restarted
>> > > > replica won't have any memory of which timestamps it has previously
>> > > > processed.
>> > > >
>> > > > So if bid 2 got processed, replica crashed and restarted, and then
>> > > > bid 1 arrived, what would happen in that case?
>> > >
>> > > In order to make this work, you would need to store the timestamp in
>> > > you store next to the actual data. Thus, you can compare the timestamp
>> > > of the latest result (safely stored in operator state) with the
>> > > timestamp of the current record.
>> > >
>> > > Does this makes sense?
>> > >
>> > > To fix you issue, you could add a .transformValue() before you KTable,
>> > > which allows you to access the timestamp of a record. If you add this
>> > > timestamp to you value and pass it to KTable afterwards, you can
>> > > access it and it gets also store reliably.
>> > >
>> > > <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
>> > > timestamp} => aggregate
>> > >
>> > > Hope this helps.
>> > >
>> > > - -Matthias
>> > >
>> > >
>> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
>> > > > P.S, does my scenario require using windows, or can it be achieved
>> > > > using just KTable?
>> > > >
>> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <al...@gmail.com>
>> > > > wrote:
>> > > >
>> > > >> Heya,
>> > > >>
>> > > >> Say I'm building a live auction site, with different products.
>> > > >> Different users will bid on different products. And each time
>> > > >> they do, I want to update the product's price, so it should
>> > > >> always have the latest price in place.
>> > > >>
>> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
>> > > >> the same product 100 ms later.
>> > > >>
>> > > >> The second bid arrives first and the price is updated to $5. Then
>> > > >> the first bid arrives. I want the price to not be updated in this
>> > > >> case, as this bid is older than the one I've already processed.
>> > > >>
>> > > >> Here's my understanding of how I can achieve this with Kafka
>> > > >> Streaming - is my understanding correct?
>> > > >>
>> > > >> - I have a topic for receiving bids. The topic has N partitions,
>> > > >> and I have N replicas of my application which hooks up w/ Kafka
>> > > >> Streaming, up and running.
>> > > >>
>> > > >> - I assume each replica of my app will listen to a different
>> > > >> partition of the topic.
>> > > >>
>> > > >> - A user makes a bid on product A.
>> > > >>
>> > > >> - This is pushed to the topic with the key bid_a
>> > > >>
>> > > >> - Another user makes a bid. This is also pushed with the same key
>> > > >> (bid_a)
>> > > >>
>> > > >> - The 2nd bid arrives first, and gets processed. Then the first
>> > > >> (older) bid arrives.
>> > > >>
>> > > >> - Because I'm using a KTable, the timestamp of the messages is
>> > > >> extracted, and I'm not shown the older bid because I've already
>> > > >> processed the later bid. The older bid is ignored.
>> > > >>
>> > > >> - All bids on product A go to the same topic partition, and hence
>> > > >> the same replica of my app, because they all have the key bid_a.
>> > > >>
>> > > >> - Because of this, the replica already knows which timestamps it
>> > > >> has processed, and is able to ignore the older messages.
>> > > >>
>> > > >> Is the above understandning correct?
>> > > >>
>> > > >> Also, what will happen if bid 2 arrived and got processed, and
>> > > >> then the particular replica crashed, and was restarted. The
>> > > >> restarted replica won't have any memory of which timestamps it
>> > > >> has previously processed.
>> > > >>
>> > > >> So if bid 2 got processed, replica crashed and restarted, and
>> > > >> then bid 1 arrived, what would happen in that case?
>> > > >>
>> > > >> Thanks.
>> > > >>
>> > > >
>> > > -----BEGIN PGP SIGNATURE-----
>> > > Comment: GPGTools - https://gpgtools.org
>> > >
>> > > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
>> > > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
>> > > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
>> > > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
>> > > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
>> > > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
>> > > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
>> > > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
>> > > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
>> > > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
>> > > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
>> > > wLVWRn2J0n9mKSiOnHki
>> > > =oJIL
>> > > -----END PGP SIGNATURE-----
>> > >
>> >
>>
>

Re: Understanding out of order message processing w/ Streaming

Posted by Ali Akhtar <al...@gmail.com>.
Makes sense. Thanks

On 13 Oct 2016 12:42 pm, "Michael Noll" <mi...@confluent.io> wrote:

> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
>
> Yes -- if your processing logic depends on the specific ordering of
> messages (which is the case for you), then you must manually implement this
> ordering-specific logic at the moment.
>
> Other use cases may not need to do that and "just work" even with
> out-of-order data.  If, for example, you are counting objects or are
> computing the sum of numbers, then you do not need to anything special.
>
>
>
>
>
> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <al...@gmail.com> wrote:
>
> > Thanks Matthias.
> >
> > So, if I'm understanding this right, Kafka will not discard which
> messages
> > which arrive out of order.
> >
> > What it will do is show messages in the order in which they arrive.
> >
> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
> >
> > Is that correct?
> >
> > Thanks.
> >
> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> > > -----BEGIN PGP SIGNED MESSAGE-----
> > > Hash: SHA512
> > >
> > > Last question first: A KTable is basically in finite window over the
> > > whole stream providing a single result (that gets updated when new
> > > data arrives). If you use windows, you cut the overall stream into
> > > finite subsets and get a result per window. Thus, I guess you do not
> > > need windows (if I understood you use case correctly).
> > >
> > > However, current state of Kafka Streams DSL, you will not be able to
> > > use KTable (directly -- see suggestion to fix this below) because is
> > > does (currently) not allow to access the timestamp of the current
> > > record (thus, you can not know if a record is late or not). You will
> > > need to use Processor API which allows you to access the current
> > > records timestamp via the Context object given in init()
> > >
> > > Your reasoning about partitions and Streams instances is correct.
> > > However, the following two are not
> > >
> > > > - Because I'm using a KTable, the timestamp of the messages is
> > > > extracted, and I'm not shown the older bid because I've already
> > > > processed the later bid. The older bid is ignored.
> > >
> > > and
> > >
> > > > - Because of this, the replica already knows which timestamps it
> > > > has processed, and is able to ignore the older messages.
> > >
> > > Late arriving records are not dropped but processes regularly. Thus,
> > > your KTable aggregate function will be called for the late arriving
> > > record, too (but as described about, you have currently no way to know
> > > it is a later record).
> > >
> > >
> > > Last but not least, you last statement is a valid concern:
> > >
> > > > Also, what will happen if bid 2 arrived and got processed, and then
> > > > the particular replica crashed, and was restarted. The restarted
> > > > replica won't have any memory of which timestamps it has previously
> > > > processed.
> > > >
> > > > So if bid 2 got processed, replica crashed and restarted, and then
> > > > bid 1 arrived, what would happen in that case?
> > >
> > > In order to make this work, you would need to store the timestamp in
> > > you store next to the actual data. Thus, you can compare the timestamp
> > > of the latest result (safely stored in operator state) with the
> > > timestamp of the current record.
> > >
> > > Does this makes sense?
> > >
> > > To fix you issue, you could add a .transformValue() before you KTable,
> > > which allows you to access the timestamp of a record. If you add this
> > > timestamp to you value and pass it to KTable afterwards, you can
> > > access it and it gets also store reliably.
> > >
> > > <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
> > > timestamp} => aggregate
> > >
> > > Hope this helps.
> > >
> > > - -Matthias
> > >
> > >
> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > > P.S, does my scenario require using windows, or can it be achieved
> > > > using just KTable?
> > > >
> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <al...@gmail.com>
> > > > wrote:
> > > >
> > > >> Heya,
> > > >>
> > > >> Say I'm building a live auction site, with different products.
> > > >> Different users will bid on different products. And each time
> > > >> they do, I want to update the product's price, so it should
> > > >> always have the latest price in place.
> > > >>
> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > > >> the same product 100 ms later.
> > > >>
> > > >> The second bid arrives first and the price is updated to $5. Then
> > > >> the first bid arrives. I want the price to not be updated in this
> > > >> case, as this bid is older than the one I've already processed.
> > > >>
> > > >> Here's my understanding of how I can achieve this with Kafka
> > > >> Streaming - is my understanding correct?
> > > >>
> > > >> - I have a topic for receiving bids. The topic has N partitions,
> > > >> and I have N replicas of my application which hooks up w/ Kafka
> > > >> Streaming, up and running.
> > > >>
> > > >> - I assume each replica of my app will listen to a different
> > > >> partition of the topic.
> > > >>
> > > >> - A user makes a bid on product A.
> > > >>
> > > >> - This is pushed to the topic with the key bid_a
> > > >>
> > > >> - Another user makes a bid. This is also pushed with the same key
> > > >> (bid_a)
> > > >>
> > > >> - The 2nd bid arrives first, and gets processed. Then the first
> > > >> (older) bid arrives.
> > > >>
> > > >> - Because I'm using a KTable, the timestamp of the messages is
> > > >> extracted, and I'm not shown the older bid because I've already
> > > >> processed the later bid. The older bid is ignored.
> > > >>
> > > >> - All bids on product A go to the same topic partition, and hence
> > > >> the same replica of my app, because they all have the key bid_a.
> > > >>
> > > >> - Because of this, the replica already knows which timestamps it
> > > >> has processed, and is able to ignore the older messages.
> > > >>
> > > >> Is the above understandning correct?
> > > >>
> > > >> Also, what will happen if bid 2 arrived and got processed, and
> > > >> then the particular replica crashed, and was restarted. The
> > > >> restarted replica won't have any memory of which timestamps it
> > > >> has previously processed.
> > > >>
> > > >> So if bid 2 got processed, replica crashed and restarted, and
> > > >> then bid 1 arrived, what would happen in that case?
> > > >>
> > > >> Thanks.
> > > >>
> > > >
> > > -----BEGIN PGP SIGNATURE-----
> > > Comment: GPGTools - https://gpgtools.org
> > >
> > > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
> > > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
> > > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
> > > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
> > > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
> > > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
> > > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
> > > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
> > > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
> > > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
> > > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
> > > wLVWRn2J0n9mKSiOnHki
> > > =oJIL
> > > -----END PGP SIGNATURE-----
> > >
> >
>

Re: Understanding out of order message processing w/ Streaming

Posted by Michael Noll <mi...@confluent.io>.
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.

Yes -- if your processing logic depends on the specific ordering of
messages (which is the case for you), then you must manually implement this
ordering-specific logic at the moment.

Other use cases may not need to do that and "just work" even with
out-of-order data.  If, for example, you are counting objects or are
computing the sum of numbers, then you do not need to anything special.





On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <al...@gmail.com> wrote:

> Thanks Matthias.
>
> So, if I'm understanding this right, Kafka will not discard which messages
> which arrive out of order.
>
> What it will do is show messages in the order in which they arrive.
>
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.
>
> Is that correct?
>
> Thanks.
>
> On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Last question first: A KTable is basically in finite window over the
> > whole stream providing a single result (that gets updated when new
> > data arrives). If you use windows, you cut the overall stream into
> > finite subsets and get a result per window. Thus, I guess you do not
> > need windows (if I understood you use case correctly).
> >
> > However, current state of Kafka Streams DSL, you will not be able to
> > use KTable (directly -- see suggestion to fix this below) because is
> > does (currently) not allow to access the timestamp of the current
> > record (thus, you can not know if a record is late or not). You will
> > need to use Processor API which allows you to access the current
> > records timestamp via the Context object given in init()
> >
> > Your reasoning about partitions and Streams instances is correct.
> > However, the following two are not
> >
> > > - Because I'm using a KTable, the timestamp of the messages is
> > > extracted, and I'm not shown the older bid because I've already
> > > processed the later bid. The older bid is ignored.
> >
> > and
> >
> > > - Because of this, the replica already knows which timestamps it
> > > has processed, and is able to ignore the older messages.
> >
> > Late arriving records are not dropped but processes regularly. Thus,
> > your KTable aggregate function will be called for the late arriving
> > record, too (but as described about, you have currently no way to know
> > it is a later record).
> >
> >
> > Last but not least, you last statement is a valid concern:
> >
> > > Also, what will happen if bid 2 arrived and got processed, and then
> > > the particular replica crashed, and was restarted. The restarted
> > > replica won't have any memory of which timestamps it has previously
> > > processed.
> > >
> > > So if bid 2 got processed, replica crashed and restarted, and then
> > > bid 1 arrived, what would happen in that case?
> >
> > In order to make this work, you would need to store the timestamp in
> > you store next to the actual data. Thus, you can compare the timestamp
> > of the latest result (safely stored in operator state) with the
> > timestamp of the current record.
> >
> > Does this makes sense?
> >
> > To fix you issue, you could add a .transformValue() before you KTable,
> > which allows you to access the timestamp of a record. If you add this
> > timestamp to you value and pass it to KTable afterwards, you can
> > access it and it gets also store reliably.
> >
> > <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
> > timestamp} => aggregate
> >
> > Hope this helps.
> >
> > - -Matthias
> >
> >
> > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > P.S, does my scenario require using windows, or can it be achieved
> > > using just KTable?
> > >
> > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <al...@gmail.com>
> > > wrote:
> > >
> > >> Heya,
> > >>
> > >> Say I'm building a live auction site, with different products.
> > >> Different users will bid on different products. And each time
> > >> they do, I want to update the product's price, so it should
> > >> always have the latest price in place.
> > >>
> > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > >> the same product 100 ms later.
> > >>
> > >> The second bid arrives first and the price is updated to $5. Then
> > >> the first bid arrives. I want the price to not be updated in this
> > >> case, as this bid is older than the one I've already processed.
> > >>
> > >> Here's my understanding of how I can achieve this with Kafka
> > >> Streaming - is my understanding correct?
> > >>
> > >> - I have a topic for receiving bids. The topic has N partitions,
> > >> and I have N replicas of my application which hooks up w/ Kafka
> > >> Streaming, up and running.
> > >>
> > >> - I assume each replica of my app will listen to a different
> > >> partition of the topic.
> > >>
> > >> - A user makes a bid on product A.
> > >>
> > >> - This is pushed to the topic with the key bid_a
> > >>
> > >> - Another user makes a bid. This is also pushed with the same key
> > >> (bid_a)
> > >>
> > >> - The 2nd bid arrives first, and gets processed. Then the first
> > >> (older) bid arrives.
> > >>
> > >> - Because I'm using a KTable, the timestamp of the messages is
> > >> extracted, and I'm not shown the older bid because I've already
> > >> processed the later bid. The older bid is ignored.
> > >>
> > >> - All bids on product A go to the same topic partition, and hence
> > >> the same replica of my app, because they all have the key bid_a.
> > >>
> > >> - Because of this, the replica already knows which timestamps it
> > >> has processed, and is able to ignore the older messages.
> > >>
> > >> Is the above understandning correct?
> > >>
> > >> Also, what will happen if bid 2 arrived and got processed, and
> > >> then the particular replica crashed, and was restarted. The
> > >> restarted replica won't have any memory of which timestamps it
> > >> has previously processed.
> > >>
> > >> So if bid 2 got processed, replica crashed and restarted, and
> > >> then bid 1 arrived, what would happen in that case?
> > >>
> > >> Thanks.
> > >>
> > >
> > -----BEGIN PGP SIGNATURE-----
> > Comment: GPGTools - https://gpgtools.org
> >
> > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
> > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
> > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
> > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
> > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
> > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
> > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
> > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
> > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
> > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
> > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
> > wLVWRn2J0n9mKSiOnHki
> > =oJIL
> > -----END PGP SIGNATURE-----
> >
>

Re: Understanding out of order message processing w/ Streaming

Posted by Ali Akhtar <al...@gmail.com>.
Thanks Matthias.

So, if I'm understanding this right, Kafka will not discard which messages
which arrive out of order.

What it will do is show messages in the order in which they arrive.

But if they arrive out of order, I have to detect / process that myself in
the processor logic.

Is that correct?

Thanks.

On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Last question first: A KTable is basically in finite window over the
> whole stream providing a single result (that gets updated when new
> data arrives). If you use windows, you cut the overall stream into
> finite subsets and get a result per window. Thus, I guess you do not
> need windows (if I understood you use case correctly).
>
> However, current state of Kafka Streams DSL, you will not be able to
> use KTable (directly -- see suggestion to fix this below) because is
> does (currently) not allow to access the timestamp of the current
> record (thus, you can not know if a record is late or not). You will
> need to use Processor API which allows you to access the current
> records timestamp via the Context object given in init()
>
> Your reasoning about partitions and Streams instances is correct.
> However, the following two are not
>
> > - Because I'm using a KTable, the timestamp of the messages is
> > extracted, and I'm not shown the older bid because I've already
> > processed the later bid. The older bid is ignored.
>
> and
>
> > - Because of this, the replica already knows which timestamps it
> > has processed, and is able to ignore the older messages.
>
> Late arriving records are not dropped but processes regularly. Thus,
> your KTable aggregate function will be called for the late arriving
> record, too (but as described about, you have currently no way to know
> it is a later record).
>
>
> Last but not least, you last statement is a valid concern:
>
> > Also, what will happen if bid 2 arrived and got processed, and then
> > the particular replica crashed, and was restarted. The restarted
> > replica won't have any memory of which timestamps it has previously
> > processed.
> >
> > So if bid 2 got processed, replica crashed and restarted, and then
> > bid 1 arrived, what would happen in that case?
>
> In order to make this work, you would need to store the timestamp in
> you store next to the actual data. Thus, you can compare the timestamp
> of the latest result (safely stored in operator state) with the
> timestamp of the current record.
>
> Does this makes sense?
>
> To fix you issue, you could add a .transformValue() before you KTable,
> which allows you to access the timestamp of a record. If you add this
> timestamp to you value and pass it to KTable afterwards, you can
> access it and it gets also store reliably.
>
> <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
> timestamp} => aggregate
>
> Hope this helps.
>
> - -Matthias
>
>
> On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > P.S, does my scenario require using windows, or can it be achieved
> > using just KTable?
> >
> > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <al...@gmail.com>
> > wrote:
> >
> >> Heya,
> >>
> >> Say I'm building a live auction site, with different products.
> >> Different users will bid on different products. And each time
> >> they do, I want to update the product's price, so it should
> >> always have the latest price in place.
> >>
> >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> >> the same product 100 ms later.
> >>
> >> The second bid arrives first and the price is updated to $5. Then
> >> the first bid arrives. I want the price to not be updated in this
> >> case, as this bid is older than the one I've already processed.
> >>
> >> Here's my understanding of how I can achieve this with Kafka
> >> Streaming - is my understanding correct?
> >>
> >> - I have a topic for receiving bids. The topic has N partitions,
> >> and I have N replicas of my application which hooks up w/ Kafka
> >> Streaming, up and running.
> >>
> >> - I assume each replica of my app will listen to a different
> >> partition of the topic.
> >>
> >> - A user makes a bid on product A.
> >>
> >> - This is pushed to the topic with the key bid_a
> >>
> >> - Another user makes a bid. This is also pushed with the same key
> >> (bid_a)
> >>
> >> - The 2nd bid arrives first, and gets processed. Then the first
> >> (older) bid arrives.
> >>
> >> - Because I'm using a KTable, the timestamp of the messages is
> >> extracted, and I'm not shown the older bid because I've already
> >> processed the later bid. The older bid is ignored.
> >>
> >> - All bids on product A go to the same topic partition, and hence
> >> the same replica of my app, because they all have the key bid_a.
> >>
> >> - Because of this, the replica already knows which timestamps it
> >> has processed, and is able to ignore the older messages.
> >>
> >> Is the above understandning correct?
> >>
> >> Also, what will happen if bid 2 arrived and got processed, and
> >> then the particular replica crashed, and was restarted. The
> >> restarted replica won't have any memory of which timestamps it
> >> has previously processed.
> >>
> >> So if bid 2 got processed, replica crashed and restarted, and
> >> then bid 1 arrived, what would happen in that case?
> >>
> >> Thanks.
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
> 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
> 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
> AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
> sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
> S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
> i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
> lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
> fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
> 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
> 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
> wLVWRn2J0n9mKSiOnHki
> =oJIL
> -----END PGP SIGNATURE-----
>

Re: Understanding out of order message processing w/ Streaming

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Last question first: A KTable is basically in finite window over the
whole stream providing a single result (that gets updated when new
data arrives). If you use windows, you cut the overall stream into
finite subsets and get a result per window. Thus, I guess you do not
need windows (if I understood you use case correctly).

However, current state of Kafka Streams DSL, you will not be able to
use KTable (directly -- see suggestion to fix this below) because is
does (currently) not allow to access the timestamp of the current
record (thus, you can not know if a record is late or not). You will
need to use Processor API which allows you to access the current
records timestamp via the Context object given in init()

Your reasoning about partitions and Streams instances is correct.
However, the following two are not

> - Because I'm using a KTable, the timestamp of the messages is
> extracted, and I'm not shown the older bid because I've already
> processed the later bid. The older bid is ignored.

and

> - Because of this, the replica already knows which timestamps it
> has processed, and is able to ignore the older messages.

Late arriving records are not dropped but processes regularly. Thus,
your KTable aggregate function will be called for the late arriving
record, too (but as described about, you have currently no way to know
it is a later record).


Last but not least, you last statement is a valid concern:

> Also, what will happen if bid 2 arrived and got processed, and then
> the particular replica crashed, and was restarted. The restarted
> replica won't have any memory of which timestamps it has previously
> processed.
> 
> So if bid 2 got processed, replica crashed and restarted, and then
> bid 1 arrived, what would happen in that case?

In order to make this work, you would need to store the timestamp in
you store next to the actual data. Thus, you can compare the timestamp
of the latest result (safely stored in operator state) with the
timestamp of the current record.

Does this makes sense?

To fix you issue, you could add a .transformValue() before you KTable,
which allows you to access the timestamp of a record. If you add this
timestamp to you value and pass it to KTable afterwards, you can
access it and it gets also store reliably.

<bid_id : bid_value> => transformValue => <bid_id : {bid_value,
timestamp} => aggregate

Hope this helps.

- -Matthias


On 10/11/16 9:12 PM, Ali Akhtar wrote:
> P.S, does my scenario require using windows, or can it be achieved
> using just KTable?
> 
> On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <al...@gmail.com>
> wrote:
> 
>> Heya,
>> 
>> Say I'm building a live auction site, with different products.
>> Different users will bid on different products. And each time
>> they do, I want to update the product's price, so it should
>> always have the latest price in place.
>> 
>> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
>> the same product 100 ms later.
>> 
>> The second bid arrives first and the price is updated to $5. Then
>> the first bid arrives. I want the price to not be updated in this
>> case, as this bid is older than the one I've already processed.
>> 
>> Here's my understanding of how I can achieve this with Kafka
>> Streaming - is my understanding correct?
>> 
>> - I have a topic for receiving bids. The topic has N partitions,
>> and I have N replicas of my application which hooks up w/ Kafka
>> Streaming, up and running.
>> 
>> - I assume each replica of my app will listen to a different
>> partition of the topic.
>> 
>> - A user makes a bid on product A.
>> 
>> - This is pushed to the topic with the key bid_a
>> 
>> - Another user makes a bid. This is also pushed with the same key
>> (bid_a)
>> 
>> - The 2nd bid arrives first, and gets processed. Then the first
>> (older) bid arrives.
>> 
>> - Because I'm using a KTable, the timestamp of the messages is
>> extracted, and I'm not shown the older bid because I've already
>> processed the later bid. The older bid is ignored.
>> 
>> - All bids on product A go to the same topic partition, and hence
>> the same replica of my app, because they all have the key bid_a.
>> 
>> - Because of this, the replica already knows which timestamps it
>> has processed, and is able to ignore the older messages.
>> 
>> Is the above understandning correct?
>> 
>> Also, what will happen if bid 2 arrived and got processed, and
>> then the particular replica crashed, and was restarted. The
>> restarted replica won't have any memory of which timestamps it
>> has previously processed.
>> 
>> So if bid 2 got processed, replica crashed and restarted, and
>> then bid 1 arrived, what would happen in that case?
>> 
>> Thanks.
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
wLVWRn2J0n9mKSiOnHki
=oJIL
-----END PGP SIGNATURE-----

Re: Understanding out of order message processing w/ Streaming

Posted by Ali Akhtar <al...@gmail.com>.
P.S, does my scenario require using windows, or can it be achieved using
just KTable?

On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <al...@gmail.com> wrote:

> Heya,
>
> Say I'm building a live auction site, with different products. Different
> users will bid on different products. And each time they do, I want to
> update the product's price, so it should always have the latest price in
> place.
>
> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on the same
> product 100 ms later.
>
> The second bid arrives first and the price is updated to $5. Then the
> first bid arrives. I want the price to not be updated in this case, as this
> bid is older than the one I've already processed.
>
> Here's my understanding of how I can achieve this with Kafka Streaming -
> is my understanding correct?
>
> - I have a topic for receiving bids. The topic has N partitions, and I
> have N replicas of my application which hooks up w/ Kafka Streaming, up and
> running.
>
> - I assume each replica of my app will listen to a different partition of
> the topic.
>
> - A user makes a bid on product A.
>
> - This is pushed to the topic with the key bid_a
>
> - Another user makes a bid. This is also pushed with the same key (bid_a)
>
> - The 2nd bid arrives first, and gets processed. Then the first (older)
> bid arrives.
>
> - Because I'm using a KTable, the timestamp of the messages is extracted,
> and I'm not shown the older bid because I've already processed the later
> bid. The older bid is ignored.
>
> - All bids on product A go to the same topic partition, and hence the same
> replica of my app, because they all have the key bid_a.
>
> - Because of this, the replica already knows which timestamps it has
> processed, and is able to ignore the older messages.
>
> Is the above understandning correct?
>
> Also, what will happen if bid 2 arrived and got processed, and then the
> particular replica crashed, and was restarted. The restarted replica won't
> have any memory of which timestamps it has previously processed.
>
> So if bid 2 got processed, replica crashed and restarted, and then bid 1
> arrived, what would happen in that case?
>
> Thanks.
>