You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nicolas Fouché <nf...@onfocus.io> on 2017/01/16 11:17:58 UTC

Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Hi,

In the same topology, I generate aggregates with 1-day windows and 1-week
windows and write them in one single topic. On Mondays, these windows have
the same start time. The effect: these aggregates overrides each other.

That happens because WindowedSerializer [1] only serializes the window
start time. I'm a bit surprised, a window has by definition a start and an
end. I suppose one wanted save on key sizes ? And/or one would consider
that topics should not contain aggregates with different granularities ?

I have two choices then, either create as many output topics as I have
granularities, or create my own serializer which also includes the window
end time. What would the community recommend ?

Getting back to the core problem:
I could understand that it's not "right" to store different granularities
in one topic, and I thought it would save resources (less topic to manage
by Kafka). But, I'm really not sure about this default serializer: it does
not serialize all instance variables of the `Window` class, and more
generally does comply to the definition of a window.

[1]
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java

Thanks.
Nicolas

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Guozhang Wang <wa...@gmail.com>.
Regarding KAFKA-4468, as discussed on the JIRA we intentionally did not
write the end-timestamp to RocksDB for storage optimization, i.e. we will
still write the combo of window-start-time and key, that is because for
TimeWindow the window length is fixed and accessible in the Windows object,
so we can just read the start-timestamp and key, and then use the Windows'
length value to calculate the end-timestamp.

The problem is that currently we do not set the end-timestamp after reading
it from RocksDB since WindowedSerde was considered internal class and not
used by users directly; this should be fixed, but I think we can still
maintain the current storage format to not write the end-timestamp as it
seems not necessary.


Guozhang

On Tue, Jan 17, 2017 at 3:16 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> With regard to the JIRA. I guess we do not want to put the end timestamp
> into the key. For general usage, windows of different type are written
> into different topics.
>
> Thus, Nicolas' use case is quite special and using custom Serde is the
> better approach to handle it, instead of changing Kafka Streams.
>
> Nicolas, of course you are still welcome to work on
> https://issues.apache.org/jira/browse/KAFKA-4468 but the patch should
> not change the key format but only compute the correct window end
> timestamp if a window gets deserialized.
>
> @Guozhang: please correct me if I am wrong and we want to follow Eno's
> suggestion.
>
>
> -Matthias
>
> On 1/17/17 1:39 AM, Eno Thereska wrote:
> > For changes that may be backwards incompatible or change the APIs we
> usually do a short KIP first (e.g., I just did one yesterday:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 114%3A+KTable+materialization+and+improved+semantics <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 114:+KTable+materialization+and+improved+semantics>). It's not meant to
> be overly-burdensome, and it encourages the community to participate in the
> design. In this case I suspect the KIP can be very short, a paragraph or so.
> >
> > Thanks
> > Eno
> >
> >> On 16 Jan 2017, at 22:52, Nicolas Fouché <nf...@onfocus.io> wrote:
> >>
> >> In the case of KAFKA-4468, it's more about state stores. But still, keys
> >> would not be backward compatible. What is the "official" policy about
> this
> >> kind of change ?
> >>
> >> 2017-01-16 23:47 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
> >>
> >>> Hi Eno,
> >>> I thought it would be impossible to put this in Kafka because of
> backward
> >>> incompatibility with the existing windowed keys, no ?
> >>> In my case, I had to recreate a new output topic, reset the topology,
> and
> >>> and reprocess all my data.
> >>>
> >>> 2017-01-16 23:05 GMT+01:00 Eno Thereska <en...@gmail.com>:
> >>>
> >>>> Nicolas,
> >>>>
> >>>> I'm checking with Bill who originally was interested in KAFKA-4468.
> If he
> >>>> isn't actively working on it, why don't you give it a go and create a
> pull
> >>>> request (PR) for it? That way your contribution is properly
> acknowledged
> >>>> etc. We can help you through with that.
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>> On 16 Jan 2017, at 18:46, Nicolas Fouché <nf...@onfocus.io> wrote:
> >>>>>
> >>>>> My current implementation:
> >>>>> https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I
> just
> >>>>> appended the window `end` at the end of the byte array.
> >>>>> Comments and suggestions are welcome !
> >>>>>
> >>>>>
> >>>>> 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
> >>>>>
> >>>>>> Hi Damian,
> >>>>>>
> >>>>>> I recall now that I copied the `WindowedSerde` class [1] from
> Confluent
> >>>>>> examples by Confluent, which uses the internal `WindowedSerializer`
> >>>> class.
> >>>>>> Better write my own Serde them. You're right, I should not rely on
> >>>>>> internal classes, especially for data written outside Kafka Streams
> >>>>>> topologies.
> >>>>>>
> >>>>>> Thanks for the insights on KAFKA-4468.
> >>>>>>
> >>>>>> https://github.com/confluentinc/examples/blob/
> >>>>>> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
> >>>>>> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
> >>>>>>
> >>>>>> Nicolas.
> >>>>>>
> >>>>>> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
> >>>>>>
> >>>>>>> Hi Nicolas,
> >>>>>>>
> >>>>>>> I guess you are using the Processor API for your topology? The
> >>>>>>> WindowedSerializer is an internal class that is used as part of the
> >>>> DSL.
> >>>>>>> In
> >>>>>>> the DSL a topic will be created for each window operation, so we
> don't
> >>>>>>> need
> >>>>>>> the end time as it can be calculated from the window size.
> >>>>>>> However, there is an open jira for this:
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-4468
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Damian
> >>>>>>>
> >>>>>>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> In the same topology, I generate aggregates with 1-day windows and
> >>>>>>> 1-week
> >>>>>>>> windows and write them in one single topic. On Mondays, these
> windows
> >>>>>>> have
> >>>>>>>> the same start time. The effect: these aggregates overrides each
> >>>> other.
> >>>>>>>>
> >>>>>>>> That happens because WindowedSerializer [1] only serializes the
> >>>> window
> >>>>>>>> start time. I'm a bit surprised, a window has by definition a
> start
> >>>> and
> >>>>>>> an
> >>>>>>>> end. I suppose one wanted save on key sizes ? And/or one would
> >>>> consider
> >>>>>>>> that topics should not contain aggregates with different
> >>>> granularities ?
> >>>>>>>>
> >>>>>>>> I have two choices then, either create as many output topics as I
> >>>> have
> >>>>>>>> granularities, or create my own serializer which also includes the
> >>>>>>> window
> >>>>>>>> end time. What would the community recommend ?
> >>>>>>>>
> >>>>>>>> Getting back to the core problem:
> >>>>>>>> I could understand that it's not "right" to store different
> >>>>>>> granularities
> >>>>>>>> in one topic, and I thought it would save resources (less topic to
> >>>>>>> manage
> >>>>>>>> by Kafka). But, I'm really not sure about this default
> serializer: it
> >>>>>>> does
> >>>>>>>> not serialize all instance variables of the `Window` class, and
> more
> >>>>>>>> generally does comply to the definition of a window.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> >>>>>>> /java/org/apache/kafka/streams/kstream/internals/WindowedSer
> >>>> ializer.java
> >>>>>>>>
> >>>>>>>> Thanks.
> >>>>>>>> Nicolas
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >
> >
>
>


-- 
-- Guozhang

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by "Matthias J. Sax" <ma...@confluent.io>.
With regard to the JIRA. I guess we do not want to put the end timestamp
into the key. For general usage, windows of different type are written
into different topics.

Thus, Nicolas' use case is quite special and using custom Serde is the
better approach to handle it, instead of changing Kafka Streams.

Nicolas, of course you are still welcome to work on
https://issues.apache.org/jira/browse/KAFKA-4468 but the patch should
not change the key format but only compute the correct window end
timestamp if a window gets deserialized.

@Guozhang: please correct me if I am wrong and we want to follow Eno's
suggestion.


-Matthias

On 1/17/17 1:39 AM, Eno Thereska wrote:
> For changes that may be backwards incompatible or change the APIs we usually do a short KIP first (e.g., I just did one yesterday: https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+materialization+and+improved+semantics <https://cwiki.apache.org/confluence/display/KAFKA/KIP-114:+KTable+materialization+and+improved+semantics>). It's not meant to be overly-burdensome, and it encourages the community to participate in the design. In this case I suspect the KIP can be very short, a paragraph or so.
> 
> Thanks
> Eno
> 
>> On 16 Jan 2017, at 22:52, Nicolas Fouché <nf...@onfocus.io> wrote:
>>
>> In the case of KAFKA-4468, it's more about state stores. But still, keys
>> would not be backward compatible. What is the "official" policy about this
>> kind of change ?
>>
>> 2017-01-16 23:47 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
>>
>>> Hi Eno,
>>> I thought it would be impossible to put this in Kafka because of backward
>>> incompatibility with the existing windowed keys, no ?
>>> In my case, I had to recreate a new output topic, reset the topology, and
>>> and reprocess all my data.
>>>
>>> 2017-01-16 23:05 GMT+01:00 Eno Thereska <en...@gmail.com>:
>>>
>>>> Nicolas,
>>>>
>>>> I'm checking with Bill who originally was interested in KAFKA-4468. If he
>>>> isn't actively working on it, why don't you give it a go and create a pull
>>>> request (PR) for it? That way your contribution is properly acknowledged
>>>> etc. We can help you through with that.
>>>>
>>>> Thanks
>>>> Eno
>>>>> On 16 Jan 2017, at 18:46, Nicolas Fouché <nf...@onfocus.io> wrote:
>>>>>
>>>>> My current implementation:
>>>>> https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just
>>>>> appended the window `end` at the end of the byte array.
>>>>> Comments and suggestions are welcome !
>>>>>
>>>>>
>>>>> 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
>>>>>
>>>>>> Hi Damian,
>>>>>>
>>>>>> I recall now that I copied the `WindowedSerde` class [1] from Confluent
>>>>>> examples by Confluent, which uses the internal `WindowedSerializer`
>>>> class.
>>>>>> Better write my own Serde them. You're right, I should not rely on
>>>>>> internal classes, especially for data written outside Kafka Streams
>>>>>> topologies.
>>>>>>
>>>>>> Thanks for the insights on KAFKA-4468.
>>>>>>
>>>>>> https://github.com/confluentinc/examples/blob/
>>>>>> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
>>>>>> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
>>>>>>
>>>>>> Nicolas.
>>>>>>
>>>>>> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
>>>>>>
>>>>>>> Hi Nicolas,
>>>>>>>
>>>>>>> I guess you are using the Processor API for your topology? The
>>>>>>> WindowedSerializer is an internal class that is used as part of the
>>>> DSL.
>>>>>>> In
>>>>>>> the DSL a topic will be created for each window operation, so we don't
>>>>>>> need
>>>>>>> the end time as it can be calculated from the window size.
>>>>>>> However, there is an open jira for this:
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4468
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Damian
>>>>>>>
>>>>>>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io>
>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> In the same topology, I generate aggregates with 1-day windows and
>>>>>>> 1-week
>>>>>>>> windows and write them in one single topic. On Mondays, these windows
>>>>>>> have
>>>>>>>> the same start time. The effect: these aggregates overrides each
>>>> other.
>>>>>>>>
>>>>>>>> That happens because WindowedSerializer [1] only serializes the
>>>> window
>>>>>>>> start time. I'm a bit surprised, a window has by definition a start
>>>> and
>>>>>>> an
>>>>>>>> end. I suppose one wanted save on key sizes ? And/or one would
>>>> consider
>>>>>>>> that topics should not contain aggregates with different
>>>> granularities ?
>>>>>>>>
>>>>>>>> I have two choices then, either create as many output topics as I
>>>> have
>>>>>>>> granularities, or create my own serializer which also includes the
>>>>>>> window
>>>>>>>> end time. What would the community recommend ?
>>>>>>>>
>>>>>>>> Getting back to the core problem:
>>>>>>>> I could understand that it's not "right" to store different
>>>>>>> granularities
>>>>>>>> in one topic, and I thought it would save resources (less topic to
>>>>>>> manage
>>>>>>>> by Kafka). But, I'm really not sure about this default serializer: it
>>>>>>> does
>>>>>>>> not serialize all instance variables of the `Window` class, and more
>>>>>>>> generally does comply to the definition of a window.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>>>>>>> /java/org/apache/kafka/streams/kstream/internals/WindowedSer
>>>> ializer.java
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> Nicolas
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>
> 
> 


Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Eno Thereska <en...@gmail.com>.
For changes that may be backwards incompatible or change the APIs we usually do a short KIP first (e.g., I just did one yesterday: https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+materialization+and+improved+semantics <https://cwiki.apache.org/confluence/display/KAFKA/KIP-114:+KTable+materialization+and+improved+semantics>). It's not meant to be overly-burdensome, and it encourages the community to participate in the design. In this case I suspect the KIP can be very short, a paragraph or so.

Thanks
Eno

> On 16 Jan 2017, at 22:52, Nicolas Fouché <nf...@onfocus.io> wrote:
> 
> In the case of KAFKA-4468, it's more about state stores. But still, keys
> would not be backward compatible. What is the "official" policy about this
> kind of change ?
> 
> 2017-01-16 23:47 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
> 
>> Hi Eno,
>> I thought it would be impossible to put this in Kafka because of backward
>> incompatibility with the existing windowed keys, no ?
>> In my case, I had to recreate a new output topic, reset the topology, and
>> and reprocess all my data.
>> 
>> 2017-01-16 23:05 GMT+01:00 Eno Thereska <en...@gmail.com>:
>> 
>>> Nicolas,
>>> 
>>> I'm checking with Bill who originally was interested in KAFKA-4468. If he
>>> isn't actively working on it, why don't you give it a go and create a pull
>>> request (PR) for it? That way your contribution is properly acknowledged
>>> etc. We can help you through with that.
>>> 
>>> Thanks
>>> Eno
>>>> On 16 Jan 2017, at 18:46, Nicolas Fouché <nf...@onfocus.io> wrote:
>>>> 
>>>> My current implementation:
>>>> https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just
>>>> appended the window `end` at the end of the byte array.
>>>> Comments and suggestions are welcome !
>>>> 
>>>> 
>>>> 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
>>>> 
>>>>> Hi Damian,
>>>>> 
>>>>> I recall now that I copied the `WindowedSerde` class [1] from Confluent
>>>>> examples by Confluent, which uses the internal `WindowedSerializer`
>>> class.
>>>>> Better write my own Serde them. You're right, I should not rely on
>>>>> internal classes, especially for data written outside Kafka Streams
>>>>> topologies.
>>>>> 
>>>>> Thanks for the insights on KAFKA-4468.
>>>>> 
>>>>> https://github.com/confluentinc/examples/blob/
>>>>> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
>>>>> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
>>>>> 
>>>>> Nicolas.
>>>>> 
>>>>> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
>>>>> 
>>>>>> Hi Nicolas,
>>>>>> 
>>>>>> I guess you are using the Processor API for your topology? The
>>>>>> WindowedSerializer is an internal class that is used as part of the
>>> DSL.
>>>>>> In
>>>>>> the DSL a topic will be created for each window operation, so we don't
>>>>>> need
>>>>>> the end time as it can be calculated from the window size.
>>>>>> However, there is an open jira for this:
>>>>>> https://issues.apache.org/jira/browse/KAFKA-4468
>>>>>> 
>>>>>> Thanks,
>>>>>> Damian
>>>>>> 
>>>>>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io>
>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> In the same topology, I generate aggregates with 1-day windows and
>>>>>> 1-week
>>>>>>> windows and write them in one single topic. On Mondays, these windows
>>>>>> have
>>>>>>> the same start time. The effect: these aggregates overrides each
>>> other.
>>>>>>> 
>>>>>>> That happens because WindowedSerializer [1] only serializes the
>>> window
>>>>>>> start time. I'm a bit surprised, a window has by definition a start
>>> and
>>>>>> an
>>>>>>> end. I suppose one wanted save on key sizes ? And/or one would
>>> consider
>>>>>>> that topics should not contain aggregates with different
>>> granularities ?
>>>>>>> 
>>>>>>> I have two choices then, either create as many output topics as I
>>> have
>>>>>>> granularities, or create my own serializer which also includes the
>>>>>> window
>>>>>>> end time. What would the community recommend ?
>>>>>>> 
>>>>>>> Getting back to the core problem:
>>>>>>> I could understand that it's not "right" to store different
>>>>>> granularities
>>>>>>> in one topic, and I thought it would save resources (less topic to
>>>>>> manage
>>>>>>> by Kafka). But, I'm really not sure about this default serializer: it
>>>>>> does
>>>>>>> not serialize all instance variables of the `Window` class, and more
>>>>>>> generally does comply to the definition of a window.
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>>>>>> /java/org/apache/kafka/streams/kstream/internals/WindowedSer
>>> ializer.java
>>>>>>> 
>>>>>>> Thanks.
>>>>>>> Nicolas
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Nicolas Fouché <nf...@onfocus.io>.
In the case of KAFKA-4468, it's more about state stores. But still, keys
would not be backward compatible. What is the "official" policy about this
kind of change ?

2017-01-16 23:47 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:

> Hi Eno,
> I thought it would be impossible to put this in Kafka because of backward
> incompatibility with the existing windowed keys, no ?
> In my case, I had to recreate a new output topic, reset the topology, and
> and reprocess all my data.
>
> 2017-01-16 23:05 GMT+01:00 Eno Thereska <en...@gmail.com>:
>
>> Nicolas,
>>
>> I'm checking with Bill who originally was interested in KAFKA-4468. If he
>> isn't actively working on it, why don't you give it a go and create a pull
>> request (PR) for it? That way your contribution is properly acknowledged
>> etc. We can help you through with that.
>>
>> Thanks
>> Eno
>> > On 16 Jan 2017, at 18:46, Nicolas Fouché <nf...@onfocus.io> wrote:
>> >
>> > My current implementation:
>> > https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just
>> > appended the window `end` at the end of the byte array.
>> > Comments and suggestions are welcome !
>> >
>> >
>> > 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
>> >
>> >> Hi Damian,
>> >>
>> >> I recall now that I copied the `WindowedSerde` class [1] from Confluent
>> >> examples by Confluent, which uses the internal `WindowedSerializer`
>> class.
>> >> Better write my own Serde them. You're right, I should not rely on
>> >> internal classes, especially for data written outside Kafka Streams
>> >> topologies.
>> >>
>> >> Thanks for the insights on KAFKA-4468.
>> >>
>> >> https://github.com/confluentinc/examples/blob/
>> >> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
>> >> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
>> >>
>> >> Nicolas.
>> >>
>> >> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
>> >>
>> >>> Hi Nicolas,
>> >>>
>> >>> I guess you are using the Processor API for your topology? The
>> >>> WindowedSerializer is an internal class that is used as part of the
>> DSL.
>> >>> In
>> >>> the DSL a topic will be created for each window operation, so we don't
>> >>> need
>> >>> the end time as it can be calculated from the window size.
>> >>> However, there is an open jira for this:
>> >>> https://issues.apache.org/jira/browse/KAFKA-4468
>> >>>
>> >>> Thanks,
>> >>> Damian
>> >>>
>> >>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io>
>> wrote:
>> >>>
>> >>>> Hi,
>> >>>>
>> >>>> In the same topology, I generate aggregates with 1-day windows and
>> >>> 1-week
>> >>>> windows and write them in one single topic. On Mondays, these windows
>> >>> have
>> >>>> the same start time. The effect: these aggregates overrides each
>> other.
>> >>>>
>> >>>> That happens because WindowedSerializer [1] only serializes the
>> window
>> >>>> start time. I'm a bit surprised, a window has by definition a start
>> and
>> >>> an
>> >>>> end. I suppose one wanted save on key sizes ? And/or one would
>> consider
>> >>>> that topics should not contain aggregates with different
>> granularities ?
>> >>>>
>> >>>> I have two choices then, either create as many output topics as I
>> have
>> >>>> granularities, or create my own serializer which also includes the
>> >>> window
>> >>>> end time. What would the community recommend ?
>> >>>>
>> >>>> Getting back to the core problem:
>> >>>> I could understand that it's not "right" to store different
>> >>> granularities
>> >>>> in one topic, and I thought it would save resources (less topic to
>> >>> manage
>> >>>> by Kafka). But, I'm really not sure about this default serializer: it
>> >>> does
>> >>>> not serialize all instance variables of the `Window` class, and more
>> >>>> generally does comply to the definition of a window.
>> >>>>
>> >>>> [1]
>> >>>>
>> >>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>> >>> /java/org/apache/kafka/streams/kstream/internals/WindowedSer
>> ializer.java
>> >>>>
>> >>>> Thanks.
>> >>>> Nicolas
>> >>>>
>> >>>
>> >>
>> >>
>>
>>
>

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Nicolas Fouché <nf...@onfocus.io>.
Hi Eno,
I thought it would be impossible to put this in Kafka because of backward
incompatibility with the existing windowed keys, no ?
In my case, I had to recreate a new output topic, reset the topology, and
and reprocess all my data.

2017-01-16 23:05 GMT+01:00 Eno Thereska <en...@gmail.com>:

> Nicolas,
>
> I'm checking with Bill who originally was interested in KAFKA-4468. If he
> isn't actively working on it, why don't you give it a go and create a pull
> request (PR) for it? That way your contribution is properly acknowledged
> etc. We can help you through with that.
>
> Thanks
> Eno
> > On 16 Jan 2017, at 18:46, Nicolas Fouché <nf...@onfocus.io> wrote:
> >
> > My current implementation:
> > https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just
> > appended the window `end` at the end of the byte array.
> > Comments and suggestions are welcome !
> >
> >
> > 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
> >
> >> Hi Damian,
> >>
> >> I recall now that I copied the `WindowedSerde` class [1] from Confluent
> >> examples by Confluent, which uses the internal `WindowedSerializer`
> class.
> >> Better write my own Serde them. You're right, I should not rely on
> >> internal classes, especially for data written outside Kafka Streams
> >> topologies.
> >>
> >> Thanks for the insights on KAFKA-4468.
> >>
> >> https://github.com/confluentinc/examples/blob/
> >> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
> >> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
> >>
> >> Nicolas.
> >>
> >> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
> >>
> >>> Hi Nicolas,
> >>>
> >>> I guess you are using the Processor API for your topology? The
> >>> WindowedSerializer is an internal class that is used as part of the
> DSL.
> >>> In
> >>> the DSL a topic will be created for each window operation, so we don't
> >>> need
> >>> the end time as it can be calculated from the window size.
> >>> However, there is an open jira for this:
> >>> https://issues.apache.org/jira/browse/KAFKA-4468
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io>
> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> In the same topology, I generate aggregates with 1-day windows and
> >>> 1-week
> >>>> windows and write them in one single topic. On Mondays, these windows
> >>> have
> >>>> the same start time. The effect: these aggregates overrides each
> other.
> >>>>
> >>>> That happens because WindowedSerializer [1] only serializes the window
> >>>> start time. I'm a bit surprised, a window has by definition a start
> and
> >>> an
> >>>> end. I suppose one wanted save on key sizes ? And/or one would
> consider
> >>>> that topics should not contain aggregates with different
> granularities ?
> >>>>
> >>>> I have two choices then, either create as many output topics as I have
> >>>> granularities, or create my own serializer which also includes the
> >>> window
> >>>> end time. What would the community recommend ?
> >>>>
> >>>> Getting back to the core problem:
> >>>> I could understand that it's not "right" to store different
> >>> granularities
> >>>> in one topic, and I thought it would save resources (less topic to
> >>> manage
> >>>> by Kafka). But, I'm really not sure about this default serializer: it
> >>> does
> >>>> not serialize all instance variables of the `Window` class, and more
> >>>> generally does comply to the definition of a window.
> >>>>
> >>>> [1]
> >>>>
> >>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> >>> /java/org/apache/kafka/streams/kstream/internals/
> WindowedSerializer.java
> >>>>
> >>>> Thanks.
> >>>> Nicolas
> >>>>
> >>>
> >>
> >>
>
>

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Eno Thereska <en...@gmail.com>.
Nicolas,

I'm checking with Bill who originally was interested in KAFKA-4468. If he isn't actively working on it, why don't you give it a go and create a pull request (PR) for it? That way your contribution is properly acknowledged etc. We can help you through with that.

Thanks
Eno
> On 16 Jan 2017, at 18:46, Nicolas Fouché <nf...@onfocus.io> wrote:
> 
> My current implementation:
> https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just
> appended the window `end` at the end of the byte array.
> Comments and suggestions are welcome !
> 
> 
> 2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:
> 
>> Hi Damian,
>> 
>> I recall now that I copied the `WindowedSerde` class [1] from Confluent
>> examples by Confluent, which uses the internal `WindowedSerializer` class.
>> Better write my own Serde them. You're right, I should not rely on
>> internal classes, especially for data written outside Kafka Streams
>> topologies.
>> 
>> Thanks for the insights on KAFKA-4468.
>> 
>> https://github.com/confluentinc/examples/blob/
>> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
>> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
>> 
>> Nicolas.
>> 
>> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
>> 
>>> Hi Nicolas,
>>> 
>>> I guess you are using the Processor API for your topology? The
>>> WindowedSerializer is an internal class that is used as part of the DSL.
>>> In
>>> the DSL a topic will be created for each window operation, so we don't
>>> need
>>> the end time as it can be calculated from the window size.
>>> However, there is an open jira for this:
>>> https://issues.apache.org/jira/browse/KAFKA-4468
>>> 
>>> Thanks,
>>> Damian
>>> 
>>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> In the same topology, I generate aggregates with 1-day windows and
>>> 1-week
>>>> windows and write them in one single topic. On Mondays, these windows
>>> have
>>>> the same start time. The effect: these aggregates overrides each other.
>>>> 
>>>> That happens because WindowedSerializer [1] only serializes the window
>>>> start time. I'm a bit surprised, a window has by definition a start and
>>> an
>>>> end. I suppose one wanted save on key sizes ? And/or one would consider
>>>> that topics should not contain aggregates with different granularities ?
>>>> 
>>>> I have two choices then, either create as many output topics as I have
>>>> granularities, or create my own serializer which also includes the
>>> window
>>>> end time. What would the community recommend ?
>>>> 
>>>> Getting back to the core problem:
>>>> I could understand that it's not "right" to store different
>>> granularities
>>>> in one topic, and I thought it would save resources (less topic to
>>> manage
>>>> by Kafka). But, I'm really not sure about this default serializer: it
>>> does
>>>> not serialize all instance variables of the `Window` class, and more
>>>> generally does comply to the definition of a window.
>>>> 
>>>> [1]
>>>> 
>>>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>>> /java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
>>>> 
>>>> Thanks.
>>>> Nicolas
>>>> 
>>> 
>> 
>> 


Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Nicolas Fouché <nf...@onfocus.io>.
My current implementation:
https://gist.github.com/nfo/eaf350afb5667a3516593da4d48e757a . I just
appended the window `end` at the end of the byte array.
Comments and suggestions are welcome !


2017-01-16 15:48 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:

> Hi Damian,
>
> I recall now that I copied the `WindowedSerde` class [1] from Confluent
> examples by Confluent, which uses the internal `WindowedSerializer` class.
> Better write my own Serde them. You're right, I should not rely on
> internal classes, especially for data written outside Kafka Streams
> topologies.
>
> Thanks for the insights on KAFKA-4468.
>
> https://github.com/confluentinc/examples/blob/
> 89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/
> main/java/io/confluent/examples/streams/utils/WindowedSerde.java
>
> Nicolas.
>
> 2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:
>
>> Hi Nicolas,
>>
>> I guess you are using the Processor API for your topology? The
>> WindowedSerializer is an internal class that is used as part of the DSL.
>> In
>> the DSL a topic will be created for each window operation, so we don't
>> need
>> the end time as it can be calculated from the window size.
>> However, there is an open jira for this:
>> https://issues.apache.org/jira/browse/KAFKA-4468
>>
>> Thanks,
>> Damian
>>
>> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io> wrote:
>>
>> > Hi,
>> >
>> > In the same topology, I generate aggregates with 1-day windows and
>> 1-week
>> > windows and write them in one single topic. On Mondays, these windows
>> have
>> > the same start time. The effect: these aggregates overrides each other.
>> >
>> > That happens because WindowedSerializer [1] only serializes the window
>> > start time. I'm a bit surprised, a window has by definition a start and
>> an
>> > end. I suppose one wanted save on key sizes ? And/or one would consider
>> > that topics should not contain aggregates with different granularities ?
>> >
>> > I have two choices then, either create as many output topics as I have
>> > granularities, or create my own serializer which also includes the
>> window
>> > end time. What would the community recommend ?
>> >
>> > Getting back to the core problem:
>> > I could understand that it's not "right" to store different
>> granularities
>> > in one topic, and I thought it would save resources (less topic to
>> manage
>> > by Kafka). But, I'm really not sure about this default serializer: it
>> does
>> > not serialize all instance variables of the `Window` class, and more
>> > generally does comply to the definition of a window.
>> >
>> > [1]
>> >
>> > https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>> /java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
>> >
>> > Thanks.
>> > Nicolas
>> >
>>
>
>

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Nicolas Fouché <nf...@onfocus.io>.
Hi Damian,

I recall now that I copied the `WindowedSerde` class [1] from Confluent
examples by Confluent, which uses the internal `WindowedSerializer` class.
Better write my own Serde them. You're right, I should not rely on internal
classes, especially for data written outside Kafka Streams topologies.

Thanks for the insights on KAFKA-4468.

https://github.com/confluentinc/examples/blob/89db45c6890cf757b8e18565bdf7bc23f119a2ff/kafka-streams/src/main/java/io/confluent/examples/streams/utils/WindowedSerde.java

Nicolas.

2017-01-16 12:31 GMT+01:00 Damian Guy <da...@gmail.com>:

> Hi Nicolas,
>
> I guess you are using the Processor API for your topology? The
> WindowedSerializer is an internal class that is used as part of the DSL. In
> the DSL a topic will be created for each window operation, so we don't need
> the end time as it can be calculated from the window size.
> However, there is an open jira for this:
> https://issues.apache.org/jira/browse/KAFKA-4468
>
> Thanks,
> Damian
>
> On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io> wrote:
>
> > Hi,
> >
> > In the same topology, I generate aggregates with 1-day windows and 1-week
> > windows and write them in one single topic. On Mondays, these windows
> have
> > the same start time. The effect: these aggregates overrides each other.
> >
> > That happens because WindowedSerializer [1] only serializes the window
> > start time. I'm a bit surprised, a window has by definition a start and
> an
> > end. I suppose one wanted save on key sizes ? And/or one would consider
> > that topics should not contain aggregates with different granularities ?
> >
> > I have two choices then, either create as many output topics as I have
> > granularities, or create my own serializer which also includes the window
> > end time. What would the community recommend ?
> >
> > Getting back to the core problem:
> > I could understand that it's not "right" to store different granularities
> > in one topic, and I thought it would save resources (less topic to manage
> > by Kafka). But, I'm really not sure about this default serializer: it
> does
> > not serialize all instance variables of the `Window` class, and more
> > generally does comply to the definition of a window.
> >
> > [1]
> >
> > https://github.com/apache/kafka/blob/0.10.1/streams/src/
> main/java/org/apache/kafka/streams/kstream/internals/
> WindowedSerializer.java
> >
> > Thanks.
> > Nicolas
> >
>

Re: Kafka Streams: got bit by WindowedSerializer (only window.start is serialized)

Posted by Damian Guy <da...@gmail.com>.
Hi Nicolas,

I guess you are using the Processor API for your topology? The
WindowedSerializer is an internal class that is used as part of the DSL. In
the DSL a topic will be created for each window operation, so we don't need
the end time as it can be calculated from the window size.
However, there is an open jira for this:
https://issues.apache.org/jira/browse/KAFKA-4468

Thanks,
Damian

On Mon, 16 Jan 2017 at 11:18 Nicolas Fouché <nf...@onfocus.io> wrote:

> Hi,
>
> In the same topology, I generate aggregates with 1-day windows and 1-week
> windows and write them in one single topic. On Mondays, these windows have
> the same start time. The effect: these aggregates overrides each other.
>
> That happens because WindowedSerializer [1] only serializes the window
> start time. I'm a bit surprised, a window has by definition a start and an
> end. I suppose one wanted save on key sizes ? And/or one would consider
> that topics should not contain aggregates with different granularities ?
>
> I have two choices then, either create as many output topics as I have
> granularities, or create my own serializer which also includes the window
> end time. What would the community recommend ?
>
> Getting back to the core problem:
> I could understand that it's not "right" to store different granularities
> in one topic, and I thought it would save resources (less topic to manage
> by Kafka). But, I'm really not sure about this default serializer: it does
> not serialize all instance variables of the `Window` class, and more
> generally does comply to the definition of a window.
>
> [1]
>
> https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
>
> Thanks.
> Nicolas
>