You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sicheng Liu <lz...@gmail.com> on 2018/06/21 17:59:46 UTC

Kafka Streams - Expiring Records By Process Time

Hi All,

We have a use case that we aggregate some metrics with its event-time
(timestamp on the metric itself) using the simplest tumbling window. The
window itself can be set a retention but since we are aggregating with
event-time the retention has to be based on event-time too. However, in our
scenario, we have some late arrival metrics (up to one year) and we hope
the window retention can be based on process-time so that we can hold the
late arrival metrics for some time and expire them after some hours even
without new metrics of the same aggregation key coming.

We have tried:
1. Set TTL on RocksDB but it is disabled in Kafka Streams.
2. Using low level processor API but scanning the statestore and delete one
by one significantly drops the performance.

Please let us know if it is possible to aggregate by event-time but setting
the window retention based on its process-time.

Thanks,
Sicheng

Re: Kafka Streams - Expiring Records By Process Time

Posted by Sicheng Liu <lz...@gmail.com>.
Hi John,

I think the situation you describe is reasonable but we can still do some
tricks there. For example, filter out the aggregated metrics with the
aggregated "count" (sum of total number of metrics in the aggregation) less
than some threshold. So to make things simpler, we can assume that this is
for backfill, and we always backfill whole hour of metrics while doing
hourly aggregation. Retention based on process time would work fine in this
case.

Thanks,
Sicheng

On Thu, Jun 21, 2018 at 5:28 PM, John Roesler <jo...@confluent.io> wrote:

> Hi Sicheng,
>
> I'm also curious about the details.
>
> Let's say you are doing a simple count aggregation with 24-hour windows.
> You got three events with key "A" on 2017-06-21, one year ago, so the
> windowed key (A,2017-06-21) has a value of 3.
>
> Fast-forward a year later. We get one late event, also for 2017-06-21. The
> correct value of (A,2017-06-21) is now 4.
>
> If you set your retention time to account for such lateness, say 2 years,
> there's no problem. Streams still has the state "(A,2017-06-21): 3" in
> storage, so it can update the value and emit "(A,2017-06-21): 4".
>
> But if you have your retention shorter, let's say 1 month, then Streams
> deleted that state long ago and no longer knows about those three prior
> events. If it processes that event, it will incorrectly report
> "(A,2017-06-21):1"
> as the value for that windowed key.
>
> So, to preserve correctness, you must either discard new events for expired
> windows or set the retention higher than any lateness you'll observe.
>
> On the other hand, you can use processing time, instead of event time, in
> which case our new event actually belongs to today's window, 2018-06-21,
> which is still retained.
>
> But this whole thing only works out if you use the same notion of time,
> event or processing, for *both* window assignment and expiration,
> otherwise, you get incorrect results. Specifically, you can "trick" Streams
> into processing that event for the expired window and get the incorrect
> result of "(A,2017-06-21): 1".
>
> Is that an accurate depiction of the situation, or have I missed something?
>
> Thanks,
> -John
>
> On Thu, Jun 21, 2018 at 5:52 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Can't you increase retention time accordingly to make sure that "old"
> > metrics are not dropped?
> >
> > -Matthias
> >
> > On 6/21/18 2:07 PM, Sicheng Liu wrote:
> > > Because we might get very "old" metrics (the timestamp on the metric is
> > > very old, even though the metric is just delivered, for example,
> > > backfill.). If you use event-time for retention, these old metrics
> could
> > be
> > > dropped and won't be aggregated. If we use process-time, at least it
> will
> > > stay in state-store for some time for aggregation.
> > >
> > > On Thu, Jun 21, 2018 at 1:24 PM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > >> I don't understand why event-time retention time cannot be used?
> Cannot
> > >> elaborate?
> > >>
> > >> -Matthias
> > >>
> > >> On 6/21/18 10:59 AM, Sicheng Liu wrote:
> > >>> Hi All,
> > >>>
> > >>> We have a use case that we aggregate some metrics with its event-time
> > >>> (timestamp on the metric itself) using the simplest tumbling window.
> > The
> > >>> window itself can be set a retention but since we are aggregating
> with
> > >>> event-time the retention has to be based on event-time too. However,
> in
> > >> our
> > >>> scenario, we have some late arrival metrics (up to one year) and we
> > hope
> > >>> the window retention can be based on process-time so that we can hold
> > the
> > >>> late arrival metrics for some time and expire them after some hours
> > even
> > >>> without new metrics of the same aggregation key coming.
> > >>>
> > >>> We have tried:
> > >>> 1. Set TTL on RocksDB but it is disabled in Kafka Streams.
> > >>> 2. Using low level processor API but scanning the statestore and
> delete
> > >> one
> > >>> by one significantly drops the performance.
> > >>>
> > >>> Please let us know if it is possible to aggregate by event-time but
> > >> setting
> > >>> the window retention based on its process-time.
> > >>>
> > >>> Thanks,
> > >>> Sicheng
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Re: Kafka Streams - Expiring Records By Process Time

Posted by John Roesler <jo...@confluent.io>.
Hi Sicheng,

I'm also curious about the details.

Let's say you are doing a simple count aggregation with 24-hour windows.
You got three events with key "A" on 2017-06-21, one year ago, so the
windowed key (A,2017-06-21) has a value of 3.

Fast-forward a year later. We get one late event, also for 2017-06-21. The
correct value of (A,2017-06-21) is now 4.

If you set your retention time to account for such lateness, say 2 years,
there's no problem. Streams still has the state "(A,2017-06-21): 3" in
storage, so it can update the value and emit "(A,2017-06-21): 4".

But if you have your retention shorter, let's say 1 month, then Streams
deleted that state long ago and no longer knows about those three prior
events. If it processes that event, it will incorrectly report
"(A,2017-06-21):1"
as the value for that windowed key.

So, to preserve correctness, you must either discard new events for expired
windows or set the retention higher than any lateness you'll observe.

On the other hand, you can use processing time, instead of event time, in
which case our new event actually belongs to today's window, 2018-06-21,
which is still retained.

But this whole thing only works out if you use the same notion of time,
event or processing, for *both* window assignment and expiration,
otherwise, you get incorrect results. Specifically, you can "trick" Streams
into processing that event for the expired window and get the incorrect
result of "(A,2017-06-21): 1".

Is that an accurate depiction of the situation, or have I missed something?

Thanks,
-John

On Thu, Jun 21, 2018 at 5:52 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Can't you increase retention time accordingly to make sure that "old"
> metrics are not dropped?
>
> -Matthias
>
> On 6/21/18 2:07 PM, Sicheng Liu wrote:
> > Because we might get very "old" metrics (the timestamp on the metric is
> > very old, even though the metric is just delivered, for example,
> > backfill.). If you use event-time for retention, these old metrics could
> be
> > dropped and won't be aggregated. If we use process-time, at least it will
> > stay in state-store for some time for aggregation.
> >
> > On Thu, Jun 21, 2018 at 1:24 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> I don't understand why event-time retention time cannot be used? Cannot
> >> elaborate?
> >>
> >> -Matthias
> >>
> >> On 6/21/18 10:59 AM, Sicheng Liu wrote:
> >>> Hi All,
> >>>
> >>> We have a use case that we aggregate some metrics with its event-time
> >>> (timestamp on the metric itself) using the simplest tumbling window.
> The
> >>> window itself can be set a retention but since we are aggregating with
> >>> event-time the retention has to be based on event-time too. However, in
> >> our
> >>> scenario, we have some late arrival metrics (up to one year) and we
> hope
> >>> the window retention can be based on process-time so that we can hold
> the
> >>> late arrival metrics for some time and expire them after some hours
> even
> >>> without new metrics of the same aggregation key coming.
> >>>
> >>> We have tried:
> >>> 1. Set TTL on RocksDB but it is disabled in Kafka Streams.
> >>> 2. Using low level processor API but scanning the statestore and delete
> >> one
> >>> by one significantly drops the performance.
> >>>
> >>> Please let us know if it is possible to aggregate by event-time but
> >> setting
> >>> the window retention based on its process-time.
> >>>
> >>> Thanks,
> >>> Sicheng
> >>>
> >>
> >>
> >
>
>

Re: Kafka Streams - Expiring Records By Process Time

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Can't you increase retention time accordingly to make sure that "old"
metrics are not dropped?

-Matthias

On 6/21/18 2:07 PM, Sicheng Liu wrote:
> Because we might get very "old" metrics (the timestamp on the metric is
> very old, even though the metric is just delivered, for example,
> backfill.). If you use event-time for retention, these old metrics could be
> dropped and won't be aggregated. If we use process-time, at least it will
> stay in state-store for some time for aggregation.
> 
> On Thu, Jun 21, 2018 at 1:24 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I don't understand why event-time retention time cannot be used? Cannot
>> elaborate?
>>
>> -Matthias
>>
>> On 6/21/18 10:59 AM, Sicheng Liu wrote:
>>> Hi All,
>>>
>>> We have a use case that we aggregate some metrics with its event-time
>>> (timestamp on the metric itself) using the simplest tumbling window. The
>>> window itself can be set a retention but since we are aggregating with
>>> event-time the retention has to be based on event-time too. However, in
>> our
>>> scenario, we have some late arrival metrics (up to one year) and we hope
>>> the window retention can be based on process-time so that we can hold the
>>> late arrival metrics for some time and expire them after some hours even
>>> without new metrics of the same aggregation key coming.
>>>
>>> We have tried:
>>> 1. Set TTL on RocksDB but it is disabled in Kafka Streams.
>>> 2. Using low level processor API but scanning the statestore and delete
>> one
>>> by one significantly drops the performance.
>>>
>>> Please let us know if it is possible to aggregate by event-time but
>> setting
>>> the window retention based on its process-time.
>>>
>>> Thanks,
>>> Sicheng
>>>
>>
>>
> 


Re: Kafka Streams - Expiring Records By Process Time

Posted by Sicheng Liu <lz...@gmail.com>.
Because we might get very "old" metrics (the timestamp on the metric is
very old, even though the metric is just delivered, for example,
backfill.). If you use event-time for retention, these old metrics could be
dropped and won't be aggregated. If we use process-time, at least it will
stay in state-store for some time for aggregation.

On Thu, Jun 21, 2018 at 1:24 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I don't understand why event-time retention time cannot be used? Cannot
> elaborate?
>
> -Matthias
>
> On 6/21/18 10:59 AM, Sicheng Liu wrote:
> > Hi All,
> >
> > We have a use case that we aggregate some metrics with its event-time
> > (timestamp on the metric itself) using the simplest tumbling window. The
> > window itself can be set a retention but since we are aggregating with
> > event-time the retention has to be based on event-time too. However, in
> our
> > scenario, we have some late arrival metrics (up to one year) and we hope
> > the window retention can be based on process-time so that we can hold the
> > late arrival metrics for some time and expire them after some hours even
> > without new metrics of the same aggregation key coming.
> >
> > We have tried:
> > 1. Set TTL on RocksDB but it is disabled in Kafka Streams.
> > 2. Using low level processor API but scanning the statestore and delete
> one
> > by one significantly drops the performance.
> >
> > Please let us know if it is possible to aggregate by event-time but
> setting
> > the window retention based on its process-time.
> >
> > Thanks,
> > Sicheng
> >
>
>

Re: Kafka Streams - Expiring Records By Process Time

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I don't understand why event-time retention time cannot be used? Cannot
elaborate?

-Matthias

On 6/21/18 10:59 AM, Sicheng Liu wrote:
> Hi All,
> 
> We have a use case that we aggregate some metrics with its event-time
> (timestamp on the metric itself) using the simplest tumbling window. The
> window itself can be set a retention but since we are aggregating with
> event-time the retention has to be based on event-time too. However, in our
> scenario, we have some late arrival metrics (up to one year) and we hope
> the window retention can be based on process-time so that we can hold the
> late arrival metrics for some time and expire them after some hours even
> without new metrics of the same aggregation key coming.
> 
> We have tried:
> 1. Set TTL on RocksDB but it is disabled in Kafka Streams.
> 2. Using low level processor API but scanning the statestore and delete one
> by one significantly drops the performance.
> 
> Please let us know if it is possible to aggregate by event-time but setting
> the window retention based on its process-time.
> 
> Thanks,
> Sicheng
>