You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Evgeny Veretennikov (JIRA)" <ji...@apache.org> on 2017/07/06 13:14:00 UTC

[jira] [Comment Edited] (KAFKA-4468) Correctly calculate the window end timestamp after read from state stores

    [ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076472#comment-16076472 ] 

Evgeny Veretennikov edited comment on KAFKA-4468 at 7/6/17 1:13 PM:
--------------------------------------------------------------------

Let me show you such example:

{code:java}
final Serde<Windowed<String>> windowedSerde = new WrapperSerde(
    new WindowedSerializer<>(new StringSerializer()),
    new WindowedDeserializer<>(new StringDeserializer())
);
final String topic = "name";
final RocksDBStore<Windowed<String>, String> store = new RocksDBStore<>(topic, windowedSerde, Serdes.String());
final MockProcessorContext context = ...;
context.setRecordContext(...);
store.init(context, store);
store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1");
store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2");
final KeyValueIterator<Windowed<String>, String> all = store.all();
all.next(); // KeyValue([key1@100/9223372036854775807], value1)
all.next(); // KeyValue([key2@101/9223372036854775807], value2)
{code}

We are able to put in store two time windows with different window sizes. When we try to get them back from store, we get two windows with proper begins, but broken ends ({{Long.MAX_VALUE}}, as in {{WindowedDeserializer}}). So, we are unable to calculate window end without saving it in {{WindowSerializer}}.

Now it seems, that [~bbejeck] was actually correct about this:

{noformat}
Unless I'm missing something, this task implies we'll need to include the window_size (and forgo the 8 bytes per key storage savings)
on serialization with WindowedSerializer. As after we've read it via the WindowedDeserializer we only have the key and the
start timestamp and don't have access to the original window_size to do the calculation.
{noformat}


was (Author: evis):
Let me show you such example:

{code:java}
final Serde<Windowed<String>> windowedSerde = new WrapperSerde(
    new WindowedSerializer<>(new StringSerializer()),
    new WindowedDeserializer<>(new StringDeserializer())
);
final String topic = "name";
final RocksDBStore<Windowed<String>, String> store = new RocksDBStore<>(topic, windowedSerde, Serdes.String());
final MockProcessorContext context = ...;
context.setRecordContext(...);
store.init(context, store);
store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1");
store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2");
final KeyValueIterator<Windowed<String>, String> all = store.all();
all.next(); // KeyValue([key1@100/9223372036854775807], value1)
all.next(); // KeyValue([key2@101/9223372036854775807], value2)
{code}

We are able to put in store two time windows with different window sizes. When we try to get them back from store, we get two windows with proper begins, but broken ends ({{Long.MAX_VALUE}}, as in {{WindowedDeserializer}}). So, we are unable to calculate window end without saving it in {{WindowSerializer}}.

Now it seems, that [~bbejeck] was actually correct about this:

{noformat}
Unless I'm missing something, this task implies we'll need to include the window_size (and forgo the 8 bytes per key storage savings) on serialization with WindowedSerializer. As after we've read it via the WindowedDeserializer we only have the key and the start timestamp and don't have access to the original window_size to do the calculation.
{noformat}

> Correctly calculate the window end timestamp after read from state stores
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4468
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the start timestamp of the window as part of the combo-key as (start-timestamp, key). The reason that we do not add the end-timestamp as well is that we can always calculate it from the start timestamp + window_length, and hence we can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)