You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2019/05/03 02:58:24 UTC

Re: Using processor API via DSL

Hi Bruno,

thank you for your help, glad to hear that those are only bugs and not a
problem on my implementation,
I'm currently using confluent docker images, I've checked their master
branch which seems to use the SNAPSHOT version however those
images/packages aren't publicly available. Are there any snapshot builds
available?
In the meantime I'm trying to create a custom docker image from kafka
source.

Thanks

--
Alessandro Tagliapietra

On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> It seems that the behaviour you described regarding the window aggregation
> is due to bugs. The good news is that the bugs have been already fixed.
>
> The relevant bug reports are
> https://issues.apache.org/jira/browse/KAFKA-7895
> https://issues.apache.org/jira/browse/KAFKA-8204
>
> The fixes for both bugs have been already merged to the 2.2 branch.
>
> Could you please build from the 2.2 branch and confirm that the fixes solve
> your problem?
>
> Best,
> Bruno
>
>
> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Thanks Matthias, one less thing to worry about in the future :)
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Just a side note. There is currently work in progress on
> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> > > configuration problem for Serdes.
> > >
> > > -Matthias
> > >
> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> > > > Hi Bruno,
> > > > thanks a lot for checking the code, regarding the SpecificAvroSerde
> > I've
> > > > found that using
> > > >
> > > > final Serde<InputList> valueSpecificAvroSerde = new
> > > SpecificAvroSerde<>();
> > > > final Map<String, String> serdeConfig =
> > > > Collections.singletonMap("schema.registry.url", "
> http://localhost:8081
> > > ");
> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
> > > >
> > > > and then in aggregate()
> > > >
> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> > > >
> > > > fixed the issue.
> > > >
> > > > Thanks in advance for the windowing help, very appreciated.
> > > > In the meantime I'll try to make some progress on the rest.
> > > >
> > > > Have a great weekend
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > >
> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io>
> > > wrote:
> > > >
> > > >> Hi Alessandro,
> > > >>
> > > >> I had a look at your code. Regarding your question whether you use
> the
> > > >> SpecificAvroSerde correctly, take a look at the following
> > documentation:
> > > >>
> > > >>
> > >
> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> > > >>
> > > >> I haven't had the time yet to take a closer look at your problems
> with
> > > the
> > > >> aggregation. I will have a look next week.
> > > >>
> > > >> Have a nice weekend,
> > > >> Bruno
> > > >>
> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> > > >> tagliapietra.alessandro@gmail.com> wrote:
> > > >>
> > > >>> So I've started with a new app with the archetype:generate as in
> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
> > > >>>
> > > >>> I've pushed a sample repo here:
> https://github.com/alex88/kafka-test
> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
> production
> > > >> and a
> > > >>> MetricList with a list of records (Metric) to be able to manually
> do
> > > the
> > > >>> aggregation.
> > > >>> Right now the aggregation is simple just for the purpose of the
> > sample
> > > >> repo
> > > >>> and to easily see if we're getting wrong values.
> > > >>>
> > > >>> What I wanted to achieve is:
> > > >>>  - have a custom generator that generates 1 message per second with
> > > >>> production = 1 with 1 ore more separate message keys which in my
> case
> > > are
> > > >>> the sensor IDs generating the data
> > > >>>  - a filter that removes out of order messages by having a state
> that
> > > >>> stores key (sensorID) -> last timestamp
> > > >>>  - a window operation that for this example just sums the values in
> > > each
> > > >> 10
> > > >>> seconds windows
> > > >>>
> > > >>> To show where I'm having issues I've setup multiple branches for
> the
> > > >> repo:
> > > >>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>*
> > is
> > > >> the
> > > >>> one I had initially "Failed to flush state store
> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve
> using
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> > > >>>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>*
> > is
> > > >> the
> > > >>> one after I've tried to solve above problem with the materializer
> > > (maybe
> > > >>> the SpecificAvroSerde is wrong?)
> > > >>>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>*
> > > after
> > > >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(),
> > new
> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you let
> > > both
> > > >>> the producer and stream running, you'll see that the stream
> receives
> > 10
> > > >>> messages (with the timestamp incrementing 1 second for each
> message)
> > > like
> > > >>> this:
> > > >>>
> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
> > > >>>
> > > >>> and at the 10 seconds interval something like:
> > > >>>
> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> > > >>> S1 with computed metric {"timestamp": 170000, "production": 10}
> > > >>> S1 with computed metric {"timestamp": 180000, "production": 10}
> > > >>>
> > > >>> and so on...
> > > >>> Now there are two problems, after stopping and restarting the
> stream
> > > >>> processor (by sending SIGINT via IntelliJ since I start the class
> > main
> > > >> with
> > > >>> it) it happens:
> > > >>>  - sometimes the aggregated count is wrong, if I have it start
> > > windowing
> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
> restart
> > > it
> > > >>> might just emit a value for the new 3 missing seconds (seconds
> 18-20)
> > > and
> > > >>> the aggregated value is 3 not 10
> > > >>>  - sometimes the window outputs twice, in the example where I
> restart
> > > the
> > > >>> stream processor I might get as output
> > > >>>
> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5}
> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> > > >>>
> > > >>> as you can see, window for timestamp 160000 is duplicated
> > > >>>
> > > >>> Is this because the window state isn't persisted across restarts?
> > > >>> My ultimate goal is to have the window part emit only once and
> resume
> > > >>> processing across restarts, while avoiding processing out of order
> > data
> > > >>> (that's the purpose of the TimestampIncrementalFilter)
> > > >>>
> > > >>> Thank you in advance
> > > >>>
> > > >>> --
> > > >>> Alessandro Tagliapietra
> > > >>>
> > > >>>
> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> > > >>> tagliapietra.alessandro@gmail.com> wrote:
> > > >>>
> > > >>>> Hi Bruno,
> > > >>>>
> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> > > >>>> Anyway I'll try to make a small reproduction repo with all the
> > > >> different
> > > >>>> cases soon.
> > > >>>>
> > > >>>> Thank you
> > > >>>>
> > > >>>> --
> > > >>>> Alessandro Tagliapietra
> > > >>>>
> > > >>>>
> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <bruno@confluent.io
> >
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Alessandro,
> > > >>>>>
> > > >>>>> What version of Kafka do you use?
> > > >>>>>
> > > >>>>> Could you please give a more detailed example for the issues with
> > the
> > > >>> two
> > > >>>>> keys you see?
> > > >>>>>
> > > >>>>> Could the following bug be related to the duplicates you see?
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> > > >>>>>
> > > >>>>> How do you restart the processor?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Bruno
> > > >>>>>
> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> > > >>>>> tagliapietra.alessandro@gmail.com> wrote:
> > > >>>>>
> > > >>>>>> Thank you Bruno,
> > > >>>>>>
> > > >>>>>> I'll look into those, however average is just a simple thing I'm
> > > >>> trying
> > > >>>>>> right now just to get an initial windowing flow working.
> > > >>>>>> In the future I'll probably still need the actual values for
> other
> > > >>>>>> calculations. We won't have more than 60 elements per window for
> > > >> sure.
> > > >>>>>>
> > > >>>>>> So far to not manually serialize/deserialize the array list I've
> > > >>>>> created an
> > > >>>>>> Avro model with an array field containing the values.
> > > >>>>>> I had issues with suppress as explained here
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> > > >>>>>>
> > > >>>>>> but I got that working.
> > > >>>>>> So far everything seems to be working, except a couple things:
> > > >>>>>>  - if I generate data with 1 key, I correctly get a value each
> 10
> > > >>>>> seconds,
> > > >>>>>> if I later start generating data with another key (while key 1
> is
> > > >>> still
> > > >>>>>> generating) the windowing emits a value only after the timestamp
> > of
> > > >>> key
> > > >>>>> 2
> > > >>>>>> reaches the last generated window
> > > >>>>>>  - while generating data, if I restart the processor as soon as
> it
> > > >>>>> starts
> > > >>>>>> it sometimes generates 2 aggregates for the same window even if
> > I'm
> > > >>>>> using
> > > >>>>>> the suppress
> > > >>>>>>
> > > >>>>>> Anyway, I'll look into your link and try to find out the cause
> of
> > > >>> these
> > > >>>>>> issues, probably starting from scratch with a simpler example
> > > >>>>>>
> > > >>>>>> Thank you for your help!
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Alessandro Tagliapietra
> > > >>>>>>
> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <
> > bruno@confluent.io>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Alessandro,
> > > >>>>>>>
> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing averages
> > > >>> without
> > > >>>>>>> using an ArrayList.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > > >>>>>>> ?
> > > >>>>>>>
> > > >>>>>>> The advantages of this pattern over the ArrayList approach is
> the
> > > >>>>> reduced
> > > >>>>>>> space needed to compute the aggregate. Note that you will still
> > > >> need
> > > >>>>> to
> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier to
> > > >>>>> implement
> > > >>>>>>> than a SerDe for an ArrayList.
> > > >>>>>>>
> > > >>>>>>> Hope that helps.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Bruno
> > > >>>>>>>
> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > > >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> > > >>>>>>>
> > > >>>>>>>> Sorry but it seemed harder than I thought,
> > > >>>>>>>>
> > > >>>>>>>> to have the custom aggregation working I need to get an
> > > >> ArrayList
> > > >>> of
> > > >>>>>> all
> > > >>>>>>>> the values in the window, so far my aggregate DSL method
> creates
> > > >>> an
> > > >>>>>>>> ArrayList on the initializer and adds each value to the list
> in
> > > >>> the
> > > >>>>>>>> aggregator.
> > > >>>>>>>> Then I think I'll have to provide a serder to change the
> output
> > > >>>>> type of
> > > >>>>>>>> that method.
> > > >>>>>>>> I was looking at
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > > >>>>>>>> but
> > > >>>>>>>> that seems more towards a list of longs and already uses
> > > >>> longSerde.
> > > >>>>>>>> I'm currently trying to implement another avro model that has
> a
> > > >>>>> field
> > > >>>>>> of
> > > >>>>>>>> type array so I can use the regular avro serializer to
> implement
> > > >>>>> this.
> > > >>>>>>>> Should I create my own serdes instead or is this the right
> way?
> > > >>>>>>>>
> > > >>>>>>>> Thank you in advance
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > > >>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Thank you Bruno and Matthias,
> > > >>>>>>>>>
> > > >>>>>>>>> I've modified the transformer to implement the
> > > >>>>>> ValueTransformerWithKey
> > > >>>>>>>>> interface and everything is working fine.
> > > >>>>>>>>> I've now to window the data and manually aggregate each
> window
> > > >>>>> data
> > > >>>>>>> since
> > > >>>>>>>>> I've to do some averages and sum of differences.
> > > >>>>>>>>> So far I've just having some issues with message types since
> > > >> I'm
> > > >>>>>>> changing
> > > >>>>>>>>> the data type when aggregating the window but I think it's an
> > > >>> easy
> > > >>>>>>>> problem.
> > > >>>>>>>>>
> > > >>>>>>>>> Thank you again
> > > >>>>>>>>> Best
> > > >>>>>>>>>
> > > >>>>>>>>> --
> > > >>>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>>
> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> > > >>>>> bruno@confluent.io>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi Alessandro,
> > > >>>>>>>>>>
> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
> > > >>>>>>>> `ProcessorSupplier`,
> > > >>>>>>>>>> so the statement
> > > >>>>>>>>>>
> > > >>>>>>>>>> `transform` is essentially equivalent to adding the
> > > >> Transformer
> > > >>>>> via
> > > >>>>>>>>>> Topology#addProcessor() to your processor topology
> > > >>>>>>>>>>
> > > >>>>>>>>>> is correct.
> > > >>>>>>>>>>
> > > >>>>>>>>>> If you do not change the key, you should definitely use one
> > > >> of
> > > >>>>> the
> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data
> > > >>>>>> redistribution.
> > > >>>>>>> In
> > > >>>>>>>>>> your case the overload with
> `ValueTransformerWithKeySupplier`
> > > >>> as
> > > >>>>>>>> suggested
> > > >>>>>>>>>> by Matthias would fit.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Bruno
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> > > >>>>>>> matthias@confluent.io
> > > >>>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
> > > >>>>> read-only
> > > >>>>>>> acess
> > > >>>>>>>>>>> to the key.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > > >>>>>>>>>>>> Hi Bruno,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you for the quick answer.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is
> > > >>>>> really no
> > > >>>>>>> way
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>> have it use `Processor<K, V>`.
> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use
> > > >> the
> > > >>>>>>> Processor
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can
> > > >>>>> use it
> > > >>>>>>> as
> > > >>>>>>>> I
> > > >>>>>>>>>>> need
> > > >>>>>>>>>>>> the message key since that is the discriminating value
> > > >> for
> > > >>>>> the
> > > >>>>>>>> filter
> > > >>>>>>>>>> (I
> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message
> > > >>> key)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Right now I've this
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>> i'm using it with `transform()` .
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> One thing I've found confusing is this
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> transform is essentially equivalent to adding the
> > > >>> Transformer
> > > >>>>>> via
> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
> > > >>>>>>>>>>>>> <
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> .
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while
> > > >>>>>>>>>> `addProcessor`
> > > >>>>>>>>>>>> uses a ProcessorSupplier?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you again for your help
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> --
> > > >>>>>>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> > > >>>>>> bruno@confluent.io
> > > >>>>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Alessandro,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in
> > > >> your
> > > >>>>> case
> > > >>>>>>> you
> > > >>>>>>>>>>> should
> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
> > > >>>>> `transform()`
> > > >>>>>>> and
> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to
> > > >>>>>> `.process`
> > > >>>>>>>> but
> > > >>>>>>>>>>> they
> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a
> > > >>>>>> windowed
> > > >>>>>>>>>>>>> aggregation.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hope that helps.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Bruno
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
> > > >> <
> > > >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi there,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a
> > > >>>>> stream
> > > >>>>>>>>>>> processor
> > > >>>>>>>>>>>>>> that in multiple stages:
> > > >>>>>>>>>>>>>>  - filters messages using a kv store so that only
> > > >>> messages
> > > >>>>>> with
> > > >>>>>>>>>> higher
> > > >>>>>>>>>>>>>> timestamp gets processed
> > > >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving e.g.
> > > >>> the
> > > >>>>>> avg
> > > >>>>>>> of
> > > >>>>>>>>>>> those
> > > >>>>>>>>>>>>>> metrics in that minute
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the
> > > >>>>> value
> > > >>>>>> is
> > > >>>>>>>>>> e.g. {
> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv
> > > >> store
> > > >>>>> and
> > > >>>>>>>> filter
> > > >>>>>>>>>> old
> > > >>>>>>>>>>>>>> messages:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
> > > >> nice
> > > >>>>>>>> windowing
> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
> > > >>> (only a
> > > >>>>>>> small
> > > >>>>>>>>>>>>> reference
> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the
> > > >> right
> > > >>>>>>>> direction?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to
> > > >>> use
> > > >>>>>> the
> > > >>>>>>>> DSL
> > > >>>>>>>>>> but
> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw
> > > >>> this
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > > >>>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I also
> > > >>> saw
> > > >>>>>> after
> > > >>>>>>>>>> that
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
> > > >>> returns
> > > >>>>>> void,
> > > >>>>>>>> so
> > > >>>>>>>>>> I
> > > >>>>>>>>>>>>>> cannot have a KStream from a processor?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thank you all in advance
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Re: Using processor API via DSL

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
Hi Bruno,

no worries.
No that was an old problem, the latest code is on the gist from the last
email.
Anyway I've pushed the master branch with the same code, I'm not sure I've
done the right thing with the jars but the code should be there.
The gist https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db is
the one with most information and the logs on what I was seeing.

Thank you for your help

--
Alessandro Tagliapietra


On Wed, May 8, 2019 at 2:18 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> Apologies for the late reply.
>
> I tried the code from your repository under
> https://github.com/alex88/kafka-test/tree/master and I run into a
> `ClassCastException`. I think this is a bug that is described here
> https://issues.apache.org/jira/browse/KAFKA-8317 .
>
> Should I have tried one of the other branches?
>
> Best regards,
> Bruno
>
> On Fri, May 3, 2019 at 9:33 AM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Ok so I'm not sure if I did this correctly,
> >
> > I've upgraded both the server (by replacing the JARs in the confluent
> > docker image with those built from kafka source) and the client (by using
> > the built JARs as local file dependencies).
> > I've used this as source:
> https://github.com/apache/kafka/archive/2.2.zip
> > When the server runs it prints:
> >
> > INFO Kafka version: 2.2.1-SNAPSHOT
> > (org.apache.kafka.common.utils.AppInfoParser).
> >
> > and regarding the client I don't see any kafka jars in the "External
> > libraries" of the IntelliJ project tab so I think it's using the local
> JARs
> > (2.2.1-SNAPSHOT).
> >
> > The problem is that the window isn't keeping the old values and still
> emits
> > values with partially processed intervals.
> >
> > Just to summarize:
> > https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db
> >
> >  - consumer emits one message per second with production = 1
> >  - windowing stream should emit one message each 10 seconds with the sum
> of
> > productions (so production = 10)
> >
> > If I restart the stream processor, it emits window functions with partial
> > data (production < 10) as you can see from the logs.
> > I've checked the JAR file and it seems to include changes from
> > https://github.com/apache/kafka/pull/6623 (it has the newly
> > added FixedOrderMap class)
> >
> > Even after removing the suppress() the error seems to persist (look at
> > consumer_nosuppress), here it seems it loses track of the contents of the
> > window:
> >
> > S1 with computed metric {"timestamp": 50000, "production": 10}
> > S1 with computed metric {"timestamp": 60000, "production": 1}
> > S1 with computed metric {"timestamp": 60000, "production": 2}
> > S1 with computed metric {"timestamp": 60000, "production": 3}
> > S1 with computed metric {"timestamp": 60000, "production": 4}
> > -- RESTART --
> > S1 with computed metric {"timestamp": 60000, "production": 1}
> > S1 with computed metric {"timestamp": 60000, "production": 2}
> > S1 with computed metric {"timestamp": 60000, "production": 3}
> > S1 with computed metric {"timestamp": 60000, "production": 4}
> > S1 with computed metric {"timestamp": 60000, "production": 5}
> > S1 with computed metric {"timestamp": 60000, "production": 6}
> > S1 with computed metric {"timestamp": 70000, "production": 1}
> >
> > after restart during the 60 seconds window the sum restarts.
> >
> > Is it something wrong with my implementation?
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra <
> > tagliapietra.alessandro@gmail.com> wrote:
> >
> > > Hi Bruno,
> > >
> > > thank you for your help, glad to hear that those are only bugs and not
> a
> > > problem on my implementation,
> > > I'm currently using confluent docker images, I've checked their master
> > > branch which seems to use the SNAPSHOT version however those
> > > images/packages aren't publicly available. Are there any snapshot
> builds
> > > available?
> > > In the meantime I'm trying to create a custom docker image from kafka
> > > source.
> > >
> > > Thanks
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > >
> > >> Hi Alessandro,
> > >>
> > >> It seems that the behaviour you described regarding the window
> > aggregation
> > >> is due to bugs. The good news is that the bugs have been already
> fixed.
> > >>
> > >> The relevant bug reports are
> > >> https://issues.apache.org/jira/browse/KAFKA-7895
> > >> https://issues.apache.org/jira/browse/KAFKA-8204
> > >>
> > >> The fixes for both bugs have been already merged to the 2.2 branch.
> > >>
> > >> Could you please build from the 2.2 branch and confirm that the fixes
> > >> solve
> > >> your problem?
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >>
> > >> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
> > >> tagliapietra.alessandro@gmail.com> wrote:
> > >>
> > >> > Thanks Matthias, one less thing to worry about in the future :)
> > >> >
> > >> > --
> > >> > Alessandro Tagliapietra
> > >> >
> > >> >
> > >> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <
> > matthias@confluent.io
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Just a side note. There is currently work in progress on
> > >> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix
> > the
> > >> > > configuration problem for Serdes.
> > >> > >
> > >> > > -Matthias
> > >> > >
> > >> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> > >> > > > Hi Bruno,
> > >> > > > thanks a lot for checking the code, regarding the
> > SpecificAvroSerde
> > >> > I've
> > >> > > > found that using
> > >> > > >
> > >> > > > final Serde<InputList> valueSpecificAvroSerde = new
> > >> > > SpecificAvroSerde<>();
> > >> > > > final Map<String, String> serdeConfig =
> > >> > > > Collections.singletonMap("schema.registry.url", "
> > >> http://localhost:8081
> > >> > > ");
> > >> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
> > >> > > >
> > >> > > > and then in aggregate()
> > >> > > >
> > >> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> > >> > > >
> > >> > > > fixed the issue.
> > >> > > >
> > >> > > > Thanks in advance for the windowing help, very appreciated.
> > >> > > > In the meantime I'll try to make some progress on the rest.
> > >> > > >
> > >> > > > Have a great weekend
> > >> > > >
> > >> > > > --
> > >> > > > Alessandro Tagliapietra
> > >> > > >
> > >> > > >
> > >> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <
> bruno@confluent.io
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > >> Hi Alessandro,
> > >> > > >>
> > >> > > >> I had a look at your code. Regarding your question whether you
> > use
> > >> the
> > >> > > >> SpecificAvroSerde correctly, take a look at the following
> > >> > documentation:
> > >> > > >>
> > >> > > >>
> > >> > >
> > >>
> > https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> > >> > > >>
> > >> > > >> I haven't had the time yet to take a closer look at your
> problems
> > >> with
> > >> > > the
> > >> > > >> aggregation. I will have a look next week.
> > >> > > >>
> > >> > > >> Have a nice weekend,
> > >> > > >> Bruno
> > >> > > >>
> > >> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> > >> > > >> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > >>
> > >> > > >>> So I've started with a new app with the archetype:generate as
> in
> > >> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
> > >> > > >>>
> > >> > > >>> I've pushed a sample repo here:
> > >> https://github.com/alex88/kafka-test
> > >> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
> > >> production
> > >> > > >> and a
> > >> > > >>> MetricList with a list of records (Metric) to be able to
> > manually
> > >> do
> > >> > > the
> > >> > > >>> aggregation.
> > >> > > >>> Right now the aggregation is simple just for the purpose of
> the
> > >> > sample
> > >> > > >> repo
> > >> > > >>> and to easily see if we're getting wrong values.
> > >> > > >>>
> > >> > > >>> What I wanted to achieve is:
> > >> > > >>>  - have a custom generator that generates 1 message per second
> > >> with
> > >> > > >>> production = 1 with 1 ore more separate message keys which in
> my
> > >> case
> > >> > > are
> > >> > > >>> the sensor IDs generating the data
> > >> > > >>>  - a filter that removes out of order messages by having a
> state
> > >> that
> > >> > > >>> stores key (sensorID) -> last timestamp
> > >> > > >>>  - a window operation that for this example just sums the
> values
> > >> in
> > >> > > each
> > >> > > >> 10
> > >> > > >>> seconds windows
> > >> > > >>>
> > >> > > >>> To show where I'm having issues I've setup multiple branches
> for
> > >> the
> > >> > > >> repo:
> > >> > > >>>  - *issue-01 <
> > https://github.com/alex88/kafka-test/tree/issue-01
> > >> >*
> > >> > is
> > >> > > >> the
> > >> > > >>> one I had initially "Failed to flush state store
> > >> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to
> solve
> > >> using
> > >> > > >>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> > >> > > >>>  - *issue-02 <
> > https://github.com/alex88/kafka-test/tree/issue-02
> > >> >*
> > >> > is
> > >> > > >> the
> > >> > > >>> one after I've tried to solve above problem with the
> > materializer
> > >> > > (maybe
> > >> > > >>> the SpecificAvroSerde is wrong?)
> > >> > > >>>  - *issue-03 <
> > https://github.com/alex88/kafka-test/tree/issue-03
> > >> >*
> > >> > > after
> > >> > > >>> fixing issue-02 (by using
> > groupByKey(Grouped.with(Serdes.String(),
> > >> > new
> > >> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if
> you
> > >> let
> > >> > > both
> > >> > > >>> the producer and stream running, you'll see that the stream
> > >> receives
> > >> > 10
> > >> > > >>> messages (with the timestamp incrementing 1 second for each
> > >> message)
> > >> > > like
> > >> > > >>> this:
> > >> > > >>>
> > >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
> > >> > > >>>
> > >> > > >>> and at the 10 seconds interval something like:
> > >> > > >>>
> > >> > > >>> S1 with computed metric {"timestamp": 160000, "production":
> 10}
> > >> > > >>> S1 with computed metric {"timestamp": 170000, "production":
> 10}
> > >> > > >>> S1 with computed metric {"timestamp": 180000, "production":
> 10}
> > >> > > >>>
> > >> > > >>> and so on...
> > >> > > >>> Now there are two problems, after stopping and restarting the
> > >> stream
> > >> > > >>> processor (by sending SIGINT via IntelliJ since I start the
> > class
> > >> > main
> > >> > > >> with
> > >> > > >>> it) it happens:
> > >> > > >>>  - sometimes the aggregated count is wrong, if I have it start
> > >> > > windowing
> > >> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
> > >> restart
> > >> > > it
> > >> > > >>> might just emit a value for the new 3 missing seconds (seconds
> > >> 18-20)
> > >> > > and
> > >> > > >>> the aggregated value is 3 not 10
> > >> > > >>>  - sometimes the window outputs twice, in the example where I
> > >> restart
> > >> > > the
> > >> > > >>> stream processor I might get as output
> > >> > > >>>
> > >> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
> > >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5}
> > >> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> > >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> > >> > > >>> S1 with computed metric {"timestamp": 160000, "production":
> 10}
> > >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> > >> > > >>>
> > >> > > >>> as you can see, window for timestamp 160000 is duplicated
> > >> > > >>>
> > >> > > >>> Is this because the window state isn't persisted across
> > restarts?
> > >> > > >>> My ultimate goal is to have the window part emit only once and
> > >> resume
> > >> > > >>> processing across restarts, while avoiding processing out of
> > order
> > >> > data
> > >> > > >>> (that's the purpose of the TimestampIncrementalFilter)
> > >> > > >>>
> > >> > > >>> Thank you in advance
> > >> > > >>>
> > >> > > >>> --
> > >> > > >>> Alessandro Tagliapietra
> > >> > > >>>
> > >> > > >>>
> > >> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> > >> > > >>> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > >>>
> > >> > > >>>> Hi Bruno,
> > >> > > >>>>
> > >> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> > >> > > >>>> Anyway I'll try to make a small reproduction repo with all
> the
> > >> > > >> different
> > >> > > >>>> cases soon.
> > >> > > >>>>
> > >> > > >>>> Thank you
> > >> > > >>>>
> > >> > > >>>> --
> > >> > > >>>> Alessandro Tagliapietra
> > >> > > >>>>
> > >> > > >>>>
> > >> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <
> > >> bruno@confluent.io>
> > >> > > >>> wrote:
> > >> > > >>>>
> > >> > > >>>>> Hi Alessandro,
> > >> > > >>>>>
> > >> > > >>>>> What version of Kafka do you use?
> > >> > > >>>>>
> > >> > > >>>>> Could you please give a more detailed example for the issues
> > >> with
> > >> > the
> > >> > > >>> two
> > >> > > >>>>> keys you see?
> > >> > > >>>>>
> > >> > > >>>>> Could the following bug be related to the duplicates you
> see?
> > >> > > >>>>>
> > >> > > >>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> > >> > > >>>>>
> > >> > > >>>>> How do you restart the processor?
> > >> > > >>>>>
> > >> > > >>>>> Best,
> > >> > > >>>>> Bruno
> > >> > > >>>>>
> > >> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> > >> > > >>>>> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > >>>>>
> > >> > > >>>>>> Thank you Bruno,
> > >> > > >>>>>>
> > >> > > >>>>>> I'll look into those, however average is just a simple
> thing
> > >> I'm
> > >> > > >>> trying
> > >> > > >>>>>> right now just to get an initial windowing flow working.
> > >> > > >>>>>> In the future I'll probably still need the actual values
> for
> > >> other
> > >> > > >>>>>> calculations. We won't have more than 60 elements per
> window
> > >> for
> > >> > > >> sure.
> > >> > > >>>>>>
> > >> > > >>>>>> So far to not manually serialize/deserialize the array list
> > >> I've
> > >> > > >>>>> created an
> > >> > > >>>>>> Avro model with an array field containing the values.
> > >> > > >>>>>> I had issues with suppress as explained here
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> > >> > > >>>>>>
> > >> > > >>>>>> but I got that working.
> > >> > > >>>>>> So far everything seems to be working, except a couple
> > things:
> > >> > > >>>>>>  - if I generate data with 1 key, I correctly get a value
> > each
> > >> 10
> > >> > > >>>>> seconds,
> > >> > > >>>>>> if I later start generating data with another key (while
> key
> > 1
> > >> is
> > >> > > >>> still
> > >> > > >>>>>> generating) the windowing emits a value only after the
> > >> timestamp
> > >> > of
> > >> > > >>> key
> > >> > > >>>>> 2
> > >> > > >>>>>> reaches the last generated window
> > >> > > >>>>>>  - while generating data, if I restart the processor as
> soon
> > >> as it
> > >> > > >>>>> starts
> > >> > > >>>>>> it sometimes generates 2 aggregates for the same window
> even
> > if
> > >> > I'm
> > >> > > >>>>> using
> > >> > > >>>>>> the suppress
> > >> > > >>>>>>
> > >> > > >>>>>> Anyway, I'll look into your link and try to find out the
> > cause
> > >> of
> > >> > > >>> these
> > >> > > >>>>>> issues, probably starting from scratch with a simpler
> example
> > >> > > >>>>>>
> > >> > > >>>>>> Thank you for your help!
> > >> > > >>>>>>
> > >> > > >>>>>> --
> > >> > > >>>>>> Alessandro Tagliapietra
> > >> > > >>>>>>
> > >> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <
> > >> > bruno@confluent.io>
> > >> > > >>>>> wrote:
> > >> > > >>>>>>
> > >> > > >>>>>>> Hi Alessandro,
> > >> > > >>>>>>>
> > >> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing
> > averages
> > >> > > >>> without
> > >> > > >>>>>>> using an ArrayList.
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > >> > > >>>>>>> ?
> > >> > > >>>>>>>
> > >> > > >>>>>>> The advantages of this pattern over the ArrayList approach
> > is
> > >> the
> > >> > > >>>>> reduced
> > >> > > >>>>>>> space needed to compute the aggregate. Note that you will
> > >> still
> > >> > > >> need
> > >> > > >>>>> to
> > >> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit
> easier
> > >> to
> > >> > > >>>>> implement
> > >> > > >>>>>>> than a SerDe for an ArrayList.
> > >> > > >>>>>>>
> > >> > > >>>>>>> Hope that helps.
> > >> > > >>>>>>>
> > >> > > >>>>>>> Best,
> > >> > > >>>>>>> Bruno
> > >> > > >>>>>>>
> > >> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > >> > > >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > >>>>>>>
> > >> > > >>>>>>>> Sorry but it seemed harder than I thought,
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> to have the custom aggregation working I need to get an
> > >> > > >> ArrayList
> > >> > > >>> of
> > >> > > >>>>>> all
> > >> > > >>>>>>>> the values in the window, so far my aggregate DSL method
> > >> creates
> > >> > > >>> an
> > >> > > >>>>>>>> ArrayList on the initializer and adds each value to the
> > list
> > >> in
> > >> > > >>> the
> > >> > > >>>>>>>> aggregator.
> > >> > > >>>>>>>> Then I think I'll have to provide a serder to change the
> > >> output
> > >> > > >>>>> type of
> > >> > > >>>>>>>> that method.
> > >> > > >>>>>>>> I was looking at
> > >> > > >>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > >> > > >>>>>>>> but
> > >> > > >>>>>>>> that seems more towards a list of longs and already uses
> > >> > > >>> longSerde.
> > >> > > >>>>>>>> I'm currently trying to implement another avro model that
> > >> has a
> > >> > > >>>>> field
> > >> > > >>>>>> of
> > >> > > >>>>>>>> type array so I can use the regular avro serializer to
> > >> implement
> > >> > > >>>>> this.
> > >> > > >>>>>>>> Should I create my own serdes instead or is this the
> right
> > >> way?
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> Thank you in advance
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> --
> > >> > > >>>>>>>> Alessandro Tagliapietra
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > >> > > >>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > >>>>>>>>
> > >> > > >>>>>>>>> Thank you Bruno and Matthias,
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> I've modified the transformer to implement the
> > >> > > >>>>>> ValueTransformerWithKey
> > >> > > >>>>>>>>> interface and everything is working fine.
> > >> > > >>>>>>>>> I've now to window the data and manually aggregate each
> > >> window
> > >> > > >>>>> data
> > >> > > >>>>>>> since
> > >> > > >>>>>>>>> I've to do some averages and sum of differences.
> > >> > > >>>>>>>>> So far I've just having some issues with message types
> > since
> > >> > > >> I'm
> > >> > > >>>>>>> changing
> > >> > > >>>>>>>>> the data type when aggregating the window but I think
> it's
> > >> an
> > >> > > >>> easy
> > >> > > >>>>>>>> problem.
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> Thank you again
> > >> > > >>>>>>>>> Best
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> --
> > >> > > >>>>>>>>> Alessandro Tagliapietra
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> > >> > > >>>>> bruno@confluent.io>
> > >> > > >>>>>>>> wrote:
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>>> Hi Alessandro,
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
> > >> > > >>>>>>>> `ProcessorSupplier`,
> > >> > > >>>>>>>>>> so the statement
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> `transform` is essentially equivalent to adding the
> > >> > > >> Transformer
> > >> > > >>>>> via
> > >> > > >>>>>>>>>> Topology#addProcessor() to your processor topology
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> is correct.
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> If you do not change the key, you should definitely use
> > one
> > >> > > >> of
> > >> > > >>>>> the
> > >> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data
> > >> > > >>>>>> redistribution.
> > >> > > >>>>>>> In
> > >> > > >>>>>>>>>> your case the overload with
> > >> `ValueTransformerWithKeySupplier`
> > >> > > >>> as
> > >> > > >>>>>>>> suggested
> > >> > > >>>>>>>>>> by Matthias would fit.
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> Best,
> > >> > > >>>>>>>>>> Bruno
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> > >> > > >>>>>>> matthias@confluent.io
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
> > >> > > >>>>> read-only
> > >> > > >>>>>>> acess
> > >> > > >>>>>>>>>>> to the key.
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>> -Matthias
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > >> > > >>>>>>>>>>>> Hi Bruno,
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> Thank you for the quick answer.
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there
> is
> > >> > > >>>>> really no
> > >> > > >>>>>>> way
> > >> > > >>>>>>>>>> to
> > >> > > >>>>>>>>>>>> have it use `Processor<K, V>`.
> > >> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to
> use
> > >> > > >> the
> > >> > > >>>>>>> Processor
> > >> > > >>>>>>>>>> in
> > >> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I
> > can
> > >> > > >>>>> use it
> > >> > > >>>>>>> as
> > >> > > >>>>>>>> I
> > >> > > >>>>>>>>>>> need
> > >> > > >>>>>>>>>>>> the message key since that is the discriminating
> value
> > >> > > >> for
> > >> > > >>>>> the
> > >> > > >>>>>>>> filter
> > >> > > >>>>>>>>>> (I
> > >> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per
> message
> > >> > > >>> key)
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> Right now I've this
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > >> > > >>>>>>>>>>>> and
> > >> > > >>>>>>>>>>>> i'm using it with `transform()` .
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> One thing I've found confusing is this
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> transform is essentially equivalent to adding the
> > >> > > >>> Transformer
> > >> > > >>>>>> via
> > >> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
> > >> > > >>>>>>>>>>>>> <
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> .
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier
> > while
> > >> > > >>>>>>>>>> `addProcessor`
> > >> > > >>>>>>>>>>>> uses a ProcessorSupplier?
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> Thank you again for your help
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> --
> > >> > > >>>>>>>>>>>> Alessandro Tagliapietra
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> > >> > > >>>>>> bruno@confluent.io
> > >> > > >>>>>>>>
> > >> > > >>>>>>>>>>> wrote:
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Hi Alessandro,
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in
> > >> > > >> your
> > >> > > >>>>> case
> > >> > > >>>>>>> you
> > >> > > >>>>>>>>>>> should
> > >> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
> > >> > > >>>>> `transform()`
> > >> > > >>>>>>> and
> > >> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar
> to
> > >> > > >>>>>> `.process`
> > >> > > >>>>>>>> but
> > >> > > >>>>>>>>>>> they
> > >> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then
> apply
> > a
> > >> > > >>>>>> windowed
> > >> > > >>>>>>>>>>>>> aggregation.
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Hope that helps.
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> Best,
> > >> > > >>>>>>>>>>>>> Bruno
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro
> > Tagliapietra
> > >> > > >> <
> > >> > > >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> Hi there,
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to
> > create a
> > >> > > >>>>> stream
> > >> > > >>>>>>>>>>> processor
> > >> > > >>>>>>>>>>>>>> that in multiple stages:
> > >> > > >>>>>>>>>>>>>>  - filters messages using a kv store so that only
> > >> > > >>> messages
> > >> > > >>>>>> with
> > >> > > >>>>>>>>>> higher
> > >> > > >>>>>>>>>>>>>> timestamp gets processed
> > >> > > >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving
> > e.g.
> > >> > > >>> the
> > >> > > >>>>>> avg
> > >> > > >>>>>>> of
> > >> > > >>>>>>>>>>> those
> > >> > > >>>>>>>>>>>>>> metrics in that minute
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and
> > the
> > >> > > >>>>> value
> > >> > > >>>>>> is
> > >> > > >>>>>>>>>> e.g. {
> > >> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv
> > >> > > >> store
> > >> > > >>>>> and
> > >> > > >>>>>>>> filter
> > >> > > >>>>>>>>>> old
> > >> > > >>>>>>>>>>>>>> messages:
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw
> very
> > >> > > >> nice
> > >> > > >>>>>>>> windowing
> > >> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
> > >> > > >>> (only a
> > >> > > >>>>>>> small
> > >> > > >>>>>>>>>>>>> reference
> > >> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the
> > >> > > >> right
> > >> > > >>>>>>>> direction?
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I
> tried
> > to
> > >> > > >>> use
> > >> > > >>>>>> the
> > >> > > >>>>>>>> DSL
> > >> > > >>>>>>>>>> but
> > >> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I
> > saw
> > >> > > >>> this
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > >> > > >>>>>>>>>>>>>> but
> > >> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I
> > also
> > >> > > >>> saw
> > >> > > >>>>>> after
> > >> > > >>>>>>>>>> that
> > >> > > >>>>>>>>>>> the
> > >> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
> > >> > > >>> returns
> > >> > > >>>>>> void,
> > >> > > >>>>>>>> so
> > >> > > >>>>>>>>>> I
> > >> > > >>>>>>>>>>>>>> cannot have a KStream from a processor?
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> Thank you all in advance
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>> --
> > >> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
> > >> > > >>>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>>
> > >> > > >>>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>>
> > >> > > >>>>>>>>>>
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>>
> > >> > > >>>
> > >> > > >>
> > >> > > >
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Re: Using processor API via DSL

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Alessandro,

Apologies for the late reply.

I tried the code from your repository under
https://github.com/alex88/kafka-test/tree/master and I run into a
`ClassCastException`. I think this is a bug that is described here
https://issues.apache.org/jira/browse/KAFKA-8317 .

Should I have tried one of the other branches?

Best regards,
Bruno

On Fri, May 3, 2019 at 9:33 AM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Ok so I'm not sure if I did this correctly,
>
> I've upgraded both the server (by replacing the JARs in the confluent
> docker image with those built from kafka source) and the client (by using
> the built JARs as local file dependencies).
> I've used this as source: https://github.com/apache/kafka/archive/2.2.zip
> When the server runs it prints:
>
> INFO Kafka version: 2.2.1-SNAPSHOT
> (org.apache.kafka.common.utils.AppInfoParser).
>
> and regarding the client I don't see any kafka jars in the "External
> libraries" of the IntelliJ project tab so I think it's using the local JARs
> (2.2.1-SNAPSHOT).
>
> The problem is that the window isn't keeping the old values and still emits
> values with partially processed intervals.
>
> Just to summarize:
> https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db
>
>  - consumer emits one message per second with production = 1
>  - windowing stream should emit one message each 10 seconds with the sum of
> productions (so production = 10)
>
> If I restart the stream processor, it emits window functions with partial
> data (production < 10) as you can see from the logs.
> I've checked the JAR file and it seems to include changes from
> https://github.com/apache/kafka/pull/6623 (it has the newly
> added FixedOrderMap class)
>
> Even after removing the suppress() the error seems to persist (look at
> consumer_nosuppress), here it seems it loses track of the contents of the
> window:
>
> S1 with computed metric {"timestamp": 50000, "production": 10}
> S1 with computed metric {"timestamp": 60000, "production": 1}
> S1 with computed metric {"timestamp": 60000, "production": 2}
> S1 with computed metric {"timestamp": 60000, "production": 3}
> S1 with computed metric {"timestamp": 60000, "production": 4}
> -- RESTART --
> S1 with computed metric {"timestamp": 60000, "production": 1}
> S1 with computed metric {"timestamp": 60000, "production": 2}
> S1 with computed metric {"timestamp": 60000, "production": 3}
> S1 with computed metric {"timestamp": 60000, "production": 4}
> S1 with computed metric {"timestamp": 60000, "production": 5}
> S1 with computed metric {"timestamp": 60000, "production": 6}
> S1 with computed metric {"timestamp": 70000, "production": 1}
>
> after restart during the 60 seconds window the sum restarts.
>
> Is it something wrong with my implementation?
>
> --
> Alessandro Tagliapietra
>
> On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > thank you for your help, glad to hear that those are only bugs and not a
> > problem on my implementation,
> > I'm currently using confluent docker images, I've checked their master
> > branch which seems to use the SNAPSHOT version however those
> > images/packages aren't publicly available. Are there any snapshot builds
> > available?
> > In the meantime I'm trying to create a custom docker image from kafka
> > source.
> >
> > Thanks
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> It seems that the behaviour you described regarding the window
> aggregation
> >> is due to bugs. The good news is that the bugs have been already fixed.
> >>
> >> The relevant bug reports are
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> https://issues.apache.org/jira/browse/KAFKA-8204
> >>
> >> The fixes for both bugs have been already merged to the 2.2 branch.
> >>
> >> Could you please build from the 2.2 branch and confirm that the fixes
> >> solve
> >> your problem?
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
> >> tagliapietra.alessandro@gmail.com> wrote:
> >>
> >> > Thanks Matthias, one less thing to worry about in the future :)
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> >
> >> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <
> matthias@confluent.io
> >> >
> >> > wrote:
> >> >
> >> > > Just a side note. There is currently work in progress on
> >> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix
> the
> >> > > configuration problem for Serdes.
> >> > >
> >> > > -Matthias
> >> > >
> >> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> >> > > > Hi Bruno,
> >> > > > thanks a lot for checking the code, regarding the
> SpecificAvroSerde
> >> > I've
> >> > > > found that using
> >> > > >
> >> > > > final Serde<InputList> valueSpecificAvroSerde = new
> >> > > SpecificAvroSerde<>();
> >> > > > final Map<String, String> serdeConfig =
> >> > > > Collections.singletonMap("schema.registry.url", "
> >> http://localhost:8081
> >> > > ");
> >> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
> >> > > >
> >> > > > and then in aggregate()
> >> > > >
> >> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> >> > > >
> >> > > > fixed the issue.
> >> > > >
> >> > > > Thanks in advance for the windowing help, very appreciated.
> >> > > > In the meantime I'll try to make some progress on the rest.
> >> > > >
> >> > > > Have a great weekend
> >> > > >
> >> > > > --
> >> > > > Alessandro Tagliapietra
> >> > > >
> >> > > >
> >> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <bruno@confluent.io
> >
> >> > > wrote:
> >> > > >
> >> > > >> Hi Alessandro,
> >> > > >>
> >> > > >> I had a look at your code. Regarding your question whether you
> use
> >> the
> >> > > >> SpecificAvroSerde correctly, take a look at the following
> >> > documentation:
> >> > > >>
> >> > > >>
> >> > >
> >>
> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> >> > > >>
> >> > > >> I haven't had the time yet to take a closer look at your problems
> >> with
> >> > > the
> >> > > >> aggregation. I will have a look next week.
> >> > > >>
> >> > > >> Have a nice weekend,
> >> > > >> Bruno
> >> > > >>
> >> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> >> > > >> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>
> >> > > >>> So I've started with a new app with the archetype:generate as in
> >> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
> >> > > >>>
> >> > > >>> I've pushed a sample repo here:
> >> https://github.com/alex88/kafka-test
> >> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
> >> production
> >> > > >> and a
> >> > > >>> MetricList with a list of records (Metric) to be able to
> manually
> >> do
> >> > > the
> >> > > >>> aggregation.
> >> > > >>> Right now the aggregation is simple just for the purpose of the
> >> > sample
> >> > > >> repo
> >> > > >>> and to easily see if we're getting wrong values.
> >> > > >>>
> >> > > >>> What I wanted to achieve is:
> >> > > >>>  - have a custom generator that generates 1 message per second
> >> with
> >> > > >>> production = 1 with 1 ore more separate message keys which in my
> >> case
> >> > > are
> >> > > >>> the sensor IDs generating the data
> >> > > >>>  - a filter that removes out of order messages by having a state
> >> that
> >> > > >>> stores key (sensorID) -> last timestamp
> >> > > >>>  - a window operation that for this example just sums the values
> >> in
> >> > > each
> >> > > >> 10
> >> > > >>> seconds windows
> >> > > >>>
> >> > > >>> To show where I'm having issues I've setup multiple branches for
> >> the
> >> > > >> repo:
> >> > > >>>  - *issue-01 <
> https://github.com/alex88/kafka-test/tree/issue-01
> >> >*
> >> > is
> >> > > >> the
> >> > > >>> one I had initially "Failed to flush state store
> >> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve
> >> using
> >> > > >>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
> >> > > >>>  - *issue-02 <
> https://github.com/alex88/kafka-test/tree/issue-02
> >> >*
> >> > is
> >> > > >> the
> >> > > >>> one after I've tried to solve above problem with the
> materializer
> >> > > (maybe
> >> > > >>> the SpecificAvroSerde is wrong?)
> >> > > >>>  - *issue-03 <
> https://github.com/alex88/kafka-test/tree/issue-03
> >> >*
> >> > > after
> >> > > >>> fixing issue-02 (by using
> groupByKey(Grouped.with(Serdes.String(),
> >> > new
> >> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you
> >> let
> >> > > both
> >> > > >>> the producer and stream running, you'll see that the stream
> >> receives
> >> > 10
> >> > > >>> messages (with the timestamp incrementing 1 second for each
> >> message)
> >> > > like
> >> > > >>> this:
> >> > > >>>
> >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
> >> > > >>>
> >> > > >>> and at the 10 seconds interval something like:
> >> > > >>>
> >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> >> > > >>> S1 with computed metric {"timestamp": 170000, "production": 10}
> >> > > >>> S1 with computed metric {"timestamp": 180000, "production": 10}
> >> > > >>>
> >> > > >>> and so on...
> >> > > >>> Now there are two problems, after stopping and restarting the
> >> stream
> >> > > >>> processor (by sending SIGINT via IntelliJ since I start the
> class
> >> > main
> >> > > >> with
> >> > > >>> it) it happens:
> >> > > >>>  - sometimes the aggregated count is wrong, if I have it start
> >> > > windowing
> >> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
> >> restart
> >> > > it
> >> > > >>> might just emit a value for the new 3 missing seconds (seconds
> >> 18-20)
> >> > > and
> >> > > >>> the aggregated value is 3 not 10
> >> > > >>>  - sometimes the window outputs twice, in the example where I
> >> restart
> >> > > the
> >> > > >>> stream processor I might get as output
> >> > > >>>
> >> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
> >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5}
> >> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
> >> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
> >> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
> >> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
> >> > > >>>
> >> > > >>> as you can see, window for timestamp 160000 is duplicated
> >> > > >>>
> >> > > >>> Is this because the window state isn't persisted across
> restarts?
> >> > > >>> My ultimate goal is to have the window part emit only once and
> >> resume
> >> > > >>> processing across restarts, while avoiding processing out of
> order
> >> > data
> >> > > >>> (that's the purpose of the TimestampIncrementalFilter)
> >> > > >>>
> >> > > >>> Thank you in advance
> >> > > >>>
> >> > > >>> --
> >> > > >>> Alessandro Tagliapietra
> >> > > >>>
> >> > > >>>
> >> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> >> > > >>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>
> >> > > >>>> Hi Bruno,
> >> > > >>>>
> >> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
> >> > > >>>> Anyway I'll try to make a small reproduction repo with all the
> >> > > >> different
> >> > > >>>> cases soon.
> >> > > >>>>
> >> > > >>>> Thank you
> >> > > >>>>
> >> > > >>>> --
> >> > > >>>> Alessandro Tagliapietra
> >> > > >>>>
> >> > > >>>>
> >> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <
> >> bruno@confluent.io>
> >> > > >>> wrote:
> >> > > >>>>
> >> > > >>>>> Hi Alessandro,
> >> > > >>>>>
> >> > > >>>>> What version of Kafka do you use?
> >> > > >>>>>
> >> > > >>>>> Could you please give a more detailed example for the issues
> >> with
> >> > the
> >> > > >>> two
> >> > > >>>>> keys you see?
> >> > > >>>>>
> >> > > >>>>> Could the following bug be related to the duplicates you see?
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> >> > > >>>>>
> >> > > >>>>> How do you restart the processor?
> >> > > >>>>>
> >> > > >>>>> Best,
> >> > > >>>>> Bruno
> >> > > >>>>>
> >> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> >> > > >>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>
> >> > > >>>>>> Thank you Bruno,
> >> > > >>>>>>
> >> > > >>>>>> I'll look into those, however average is just a simple thing
> >> I'm
> >> > > >>> trying
> >> > > >>>>>> right now just to get an initial windowing flow working.
> >> > > >>>>>> In the future I'll probably still need the actual values for
> >> other
> >> > > >>>>>> calculations. We won't have more than 60 elements per window
> >> for
> >> > > >> sure.
> >> > > >>>>>>
> >> > > >>>>>> So far to not manually serialize/deserialize the array list
> >> I've
> >> > > >>>>> created an
> >> > > >>>>>> Avro model with an array field containing the values.
> >> > > >>>>>> I had issues with suppress as explained here
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> >> > > >>>>>>
> >> > > >>>>>> but I got that working.
> >> > > >>>>>> So far everything seems to be working, except a couple
> things:
> >> > > >>>>>>  - if I generate data with 1 key, I correctly get a value
> each
> >> 10
> >> > > >>>>> seconds,
> >> > > >>>>>> if I later start generating data with another key (while key
> 1
> >> is
> >> > > >>> still
> >> > > >>>>>> generating) the windowing emits a value only after the
> >> timestamp
> >> > of
> >> > > >>> key
> >> > > >>>>> 2
> >> > > >>>>>> reaches the last generated window
> >> > > >>>>>>  - while generating data, if I restart the processor as soon
> >> as it
> >> > > >>>>> starts
> >> > > >>>>>> it sometimes generates 2 aggregates for the same window even
> if
> >> > I'm
> >> > > >>>>> using
> >> > > >>>>>> the suppress
> >> > > >>>>>>
> >> > > >>>>>> Anyway, I'll look into your link and try to find out the
> cause
> >> of
> >> > > >>> these
> >> > > >>>>>> issues, probably starting from scratch with a simpler example
> >> > > >>>>>>
> >> > > >>>>>> Thank you for your help!
> >> > > >>>>>>
> >> > > >>>>>> --
> >> > > >>>>>> Alessandro Tagliapietra
> >> > > >>>>>>
> >> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <
> >> > bruno@confluent.io>
> >> > > >>>>> wrote:
> >> > > >>>>>>
> >> > > >>>>>>> Hi Alessandro,
> >> > > >>>>>>>
> >> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing
> averages
> >> > > >>> without
> >> > > >>>>>>> using an ArrayList.
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> >> > > >>>>>>> ?
> >> > > >>>>>>>
> >> > > >>>>>>> The advantages of this pattern over the ArrayList approach
> is
> >> the
> >> > > >>>>> reduced
> >> > > >>>>>>> space needed to compute the aggregate. Note that you will
> >> still
> >> > > >> need
> >> > > >>>>> to
> >> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier
> >> to
> >> > > >>>>> implement
> >> > > >>>>>>> than a SerDe for an ArrayList.
> >> > > >>>>>>>
> >> > > >>>>>>> Hope that helps.
> >> > > >>>>>>>
> >> > > >>>>>>> Best,
> >> > > >>>>>>> Bruno
> >> > > >>>>>>>
> >> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> >> > > >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>>>
> >> > > >>>>>>>> Sorry but it seemed harder than I thought,
> >> > > >>>>>>>>
> >> > > >>>>>>>> to have the custom aggregation working I need to get an
> >> > > >> ArrayList
> >> > > >>> of
> >> > > >>>>>> all
> >> > > >>>>>>>> the values in the window, so far my aggregate DSL method
> >> creates
> >> > > >>> an
> >> > > >>>>>>>> ArrayList on the initializer and adds each value to the
> list
> >> in
> >> > > >>> the
> >> > > >>>>>>>> aggregator.
> >> > > >>>>>>>> Then I think I'll have to provide a serder to change the
> >> output
> >> > > >>>>> type of
> >> > > >>>>>>>> that method.
> >> > > >>>>>>>> I was looking at
> >> > > >>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> >> > > >>>>>>>> but
> >> > > >>>>>>>> that seems more towards a list of longs and already uses
> >> > > >>> longSerde.
> >> > > >>>>>>>> I'm currently trying to implement another avro model that
> >> has a
> >> > > >>>>> field
> >> > > >>>>>> of
> >> > > >>>>>>>> type array so I can use the regular avro serializer to
> >> implement
> >> > > >>>>> this.
> >> > > >>>>>>>> Should I create my own serdes instead or is this the right
> >> way?
> >> > > >>>>>>>>
> >> > > >>>>>>>> Thank you in advance
> >> > > >>>>>>>>
> >> > > >>>>>>>> --
> >> > > >>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>
> >> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> >> > > >>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>>>>
> >> > > >>>>>>>>> Thank you Bruno and Matthias,
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> I've modified the transformer to implement the
> >> > > >>>>>> ValueTransformerWithKey
> >> > > >>>>>>>>> interface and everything is working fine.
> >> > > >>>>>>>>> I've now to window the data and manually aggregate each
> >> window
> >> > > >>>>> data
> >> > > >>>>>>> since
> >> > > >>>>>>>>> I've to do some averages and sum of differences.
> >> > > >>>>>>>>> So far I've just having some issues with message types
> since
> >> > > >> I'm
> >> > > >>>>>>> changing
> >> > > >>>>>>>>> the data type when aggregating the window but I think it's
> >> an
> >> > > >>> easy
> >> > > >>>>>>>> problem.
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> Thank you again
> >> > > >>>>>>>>> Best
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> --
> >> > > >>>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>>
> >> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> >> > > >>>>> bruno@confluent.io>
> >> > > >>>>>>>> wrote:
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>> Hi Alessandro,
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
> >> > > >>>>>>>> `ProcessorSupplier`,
> >> > > >>>>>>>>>> so the statement
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> `transform` is essentially equivalent to adding the
> >> > > >> Transformer
> >> > > >>>>> via
> >> > > >>>>>>>>>> Topology#addProcessor() to your processor topology
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> is correct.
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> If you do not change the key, you should definitely use
> one
> >> > > >> of
> >> > > >>>>> the
> >> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data
> >> > > >>>>>> redistribution.
> >> > > >>>>>>> In
> >> > > >>>>>>>>>> your case the overload with
> >> `ValueTransformerWithKeySupplier`
> >> > > >>> as
> >> > > >>>>>>>> suggested
> >> > > >>>>>>>>>> by Matthias would fit.
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> Best,
> >> > > >>>>>>>>>> Bruno
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> >> > > >>>>>>> matthias@confluent.io
> >> > > >>>>>>>>>
> >> > > >>>>>>>>>> wrote:
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
> >> > > >>>>> read-only
> >> > > >>>>>>> acess
> >> > > >>>>>>>>>>> to the key.
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>> -Matthias
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> >> > > >>>>>>>>>>>> Hi Bruno,
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Thank you for the quick answer.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is
> >> > > >>>>> really no
> >> > > >>>>>>> way
> >> > > >>>>>>>>>> to
> >> > > >>>>>>>>>>>> have it use `Processor<K, V>`.
> >> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use
> >> > > >> the
> >> > > >>>>>>> Processor
> >> > > >>>>>>>>>> in
> >> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I
> can
> >> > > >>>>> use it
> >> > > >>>>>>> as
> >> > > >>>>>>>> I
> >> > > >>>>>>>>>>> need
> >> > > >>>>>>>>>>>> the message key since that is the discriminating value
> >> > > >> for
> >> > > >>>>> the
> >> > > >>>>>>>> filter
> >> > > >>>>>>>>>> (I
> >> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message
> >> > > >>> key)
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Right now I've this
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> >> > > >>>>>>>>>>>> and
> >> > > >>>>>>>>>>>> i'm using it with `transform()` .
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> One thing I've found confusing is this
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> transform is essentially equivalent to adding the
> >> > > >>> Transformer
> >> > > >>>>>> via
> >> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
> >> > > >>>>>>>>>>>>> <
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> .
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier
> while
> >> > > >>>>>>>>>> `addProcessor`
> >> > > >>>>>>>>>>>> uses a ProcessorSupplier?
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> Thank you again for your help
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> --
> >> > > >>>>>>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> >> > > >>>>>> bruno@confluent.io
> >> > > >>>>>>>>
> >> > > >>>>>>>>>>> wrote:
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Hi Alessandro,
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in
> >> > > >> your
> >> > > >>>>> case
> >> > > >>>>>>> you
> >> > > >>>>>>>>>>> should
> >> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
> >> > > >>>>> `transform()`
> >> > > >>>>>>> and
> >> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to
> >> > > >>>>>> `.process`
> >> > > >>>>>>>> but
> >> > > >>>>>>>>>>> they
> >> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply
> a
> >> > > >>>>>> windowed
> >> > > >>>>>>>>>>>>> aggregation.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Hope that helps.
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> Best,
> >> > > >>>>>>>>>>>>> Bruno
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro
> Tagliapietra
> >> > > >> <
> >> > > >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Hi there,
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to
> create a
> >> > > >>>>> stream
> >> > > >>>>>>>>>>> processor
> >> > > >>>>>>>>>>>>>> that in multiple stages:
> >> > > >>>>>>>>>>>>>>  - filters messages using a kv store so that only
> >> > > >>> messages
> >> > > >>>>>> with
> >> > > >>>>>>>>>> higher
> >> > > >>>>>>>>>>>>>> timestamp gets processed
> >> > > >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving
> e.g.
> >> > > >>> the
> >> > > >>>>>> avg
> >> > > >>>>>>> of
> >> > > >>>>>>>>>>> those
> >> > > >>>>>>>>>>>>>> metrics in that minute
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and
> the
> >> > > >>>>> value
> >> > > >>>>>> is
> >> > > >>>>>>>>>> e.g. {
> >> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv
> >> > > >> store
> >> > > >>>>> and
> >> > > >>>>>>>> filter
> >> > > >>>>>>>>>> old
> >> > > >>>>>>>>>>>>>> messages:
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
> >> > > >> nice
> >> > > >>>>>>>> windowing
> >> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
> >> > > >>> (only a
> >> > > >>>>>>> small
> >> > > >>>>>>>>>>>>> reference
> >> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the
> >> > > >> right
> >> > > >>>>>>>> direction?
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried
> to
> >> > > >>> use
> >> > > >>>>>> the
> >> > > >>>>>>>> DSL
> >> > > >>>>>>>>>> but
> >> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I
> saw
> >> > > >>> this
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >> > > >>>>>>>>>>>>>> but
> >> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I
> also
> >> > > >>> saw
> >> > > >>>>>> after
> >> > > >>>>>>>>>> that
> >> > > >>>>>>>>>>> the
> >> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
> >> > > >>> returns
> >> > > >>>>>> void,
> >> > > >>>>>>>> so
> >> > > >>>>>>>>>> I
> >> > > >>>>>>>>>>>>>> cannot have a KStream from a processor?
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> Thank you all in advance
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>> --
> >> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
> >> > > >>>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>>
> >> > > >>>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>>
> >> > > >>>>>>>>>>
> >> > > >>>>>>>>>
> >> > > >>>>>>>>
> >> > > >>>>>>>
> >> > > >>>>>>
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> >
>

Re: Using processor API via DSL

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
Ok so I'm not sure if I did this correctly,

I've upgraded both the server (by replacing the JARs in the confluent
docker image with those built from kafka source) and the client (by using
the built JARs as local file dependencies).
I've used this as source: https://github.com/apache/kafka/archive/2.2.zip
When the server runs it prints:

INFO Kafka version: 2.2.1-SNAPSHOT
(org.apache.kafka.common.utils.AppInfoParser).

and regarding the client I don't see any kafka jars in the "External
libraries" of the IntelliJ project tab so I think it's using the local JARs
(2.2.1-SNAPSHOT).

The problem is that the window isn't keeping the old values and still emits
values with partially processed intervals.

Just to summarize:
https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db

 - consumer emits one message per second with production = 1
 - windowing stream should emit one message each 10 seconds with the sum of
productions (so production = 10)

If I restart the stream processor, it emits window functions with partial
data (production < 10) as you can see from the logs.
I've checked the JAR file and it seems to include changes from
https://github.com/apache/kafka/pull/6623 (it has the newly
added FixedOrderMap class)

Even after removing the suppress() the error seems to persist (look at
consumer_nosuppress), here it seems it loses track of the contents of the
window:

S1 with computed metric {"timestamp": 50000, "production": 10}
S1 with computed metric {"timestamp": 60000, "production": 1}
S1 with computed metric {"timestamp": 60000, "production": 2}
S1 with computed metric {"timestamp": 60000, "production": 3}
S1 with computed metric {"timestamp": 60000, "production": 4}
-- RESTART --
S1 with computed metric {"timestamp": 60000, "production": 1}
S1 with computed metric {"timestamp": 60000, "production": 2}
S1 with computed metric {"timestamp": 60000, "production": 3}
S1 with computed metric {"timestamp": 60000, "production": 4}
S1 with computed metric {"timestamp": 60000, "production": 5}
S1 with computed metric {"timestamp": 60000, "production": 6}
S1 with computed metric {"timestamp": 70000, "production": 1}

after restart during the 60 seconds window the sum restarts.

Is it something wrong with my implementation?

--
Alessandro Tagliapietra

On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hi Bruno,
>
> thank you for your help, glad to hear that those are only bugs and not a
> problem on my implementation,
> I'm currently using confluent docker images, I've checked their master
> branch which seems to use the SNAPSHOT version however those
> images/packages aren't publicly available. Are there any snapshot builds
> available?
> In the meantime I'm trying to create a custom docker image from kafka
> source.
>
> Thanks
>
> --
> Alessandro Tagliapietra
>
> On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna <br...@confluent.io> wrote:
>
>> Hi Alessandro,
>>
>> It seems that the behaviour you described regarding the window aggregation
>> is due to bugs. The good news is that the bugs have been already fixed.
>>
>> The relevant bug reports are
>> https://issues.apache.org/jira/browse/KAFKA-7895
>> https://issues.apache.org/jira/browse/KAFKA-8204
>>
>> The fixes for both bugs have been already merged to the 2.2 branch.
>>
>> Could you please build from the 2.2 branch and confirm that the fixes
>> solve
>> your problem?
>>
>> Best,
>> Bruno
>>
>>
>> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
>> tagliapietra.alessandro@gmail.com> wrote:
>>
>> > Thanks Matthias, one less thing to worry about in the future :)
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> >
>> > On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax <matthias@confluent.io
>> >
>> > wrote:
>> >
>> > > Just a side note. There is currently work in progress on
>> > > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
>> > > configuration problem for Serdes.
>> > >
>> > > -Matthias
>> > >
>> > > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
>> > > > Hi Bruno,
>> > > > thanks a lot for checking the code, regarding the SpecificAvroSerde
>> > I've
>> > > > found that using
>> > > >
>> > > > final Serde<InputList> valueSpecificAvroSerde = new
>> > > SpecificAvroSerde<>();
>> > > > final Map<String, String> serdeConfig =
>> > > > Collections.singletonMap("schema.registry.url", "
>> http://localhost:8081
>> > > ");
>> > > > valueSpecificAvroSerde.configure(serdeConfig, false);
>> > > >
>> > > > and then in aggregate()
>> > > >
>> > > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
>> > > >
>> > > > fixed the issue.
>> > > >
>> > > > Thanks in advance for the windowing help, very appreciated.
>> > > > In the meantime I'll try to make some progress on the rest.
>> > > >
>> > > > Have a great weekend
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > >
>> > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io>
>> > > wrote:
>> > > >
>> > > >> Hi Alessandro,
>> > > >>
>> > > >> I had a look at your code. Regarding your question whether you use
>> the
>> > > >> SpecificAvroSerde correctly, take a look at the following
>> > documentation:
>> > > >>
>> > > >>
>> > >
>> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
>> > > >>
>> > > >> I haven't had the time yet to take a closer look at your problems
>> with
>> > > the
>> > > >> aggregation. I will have a look next week.
>> > > >>
>> > > >> Have a nice weekend,
>> > > >> Bruno
>> > > >>
>> > > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
>> > > >> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>
>> > > >>> So I've started with a new app with the archetype:generate as in
>> > > >>> https://kafka.apache.org/22/documentation/streams/tutorial
>> > > >>>
>> > > >>> I've pushed a sample repo here:
>> https://github.com/alex88/kafka-test
>> > > >>> The avro schemas are a Metric with 2 fields: timestamp and
>> production
>> > > >> and a
>> > > >>> MetricList with a list of records (Metric) to be able to manually
>> do
>> > > the
>> > > >>> aggregation.
>> > > >>> Right now the aggregation is simple just for the purpose of the
>> > sample
>> > > >> repo
>> > > >>> and to easily see if we're getting wrong values.
>> > > >>>
>> > > >>> What I wanted to achieve is:
>> > > >>>  - have a custom generator that generates 1 message per second
>> with
>> > > >>> production = 1 with 1 ore more separate message keys which in my
>> case
>> > > are
>> > > >>> the sensor IDs generating the data
>> > > >>>  - a filter that removes out of order messages by having a state
>> that
>> > > >>> stores key (sensorID) -> last timestamp
>> > > >>>  - a window operation that for this example just sums the values
>> in
>> > > each
>> > > >> 10
>> > > >>> seconds windows
>> > > >>>
>> > > >>> To show where I'm having issues I've setup multiple branches for
>> the
>> > > >> repo:
>> > > >>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01
>> >*
>> > is
>> > > >> the
>> > > >>> one I had initially "Failed to flush state store
>> > > >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve
>> using
>> > > >>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
>> > > >>>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02
>> >*
>> > is
>> > > >> the
>> > > >>> one after I've tried to solve above problem with the materializer
>> > > (maybe
>> > > >>> the SpecificAvroSerde is wrong?)
>> > > >>>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03
>> >*
>> > > after
>> > > >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(),
>> > new
>> > > >>> SpecificAvroSerde<>()))) everything seems to be working, if you
>> let
>> > > both
>> > > >>> the producer and stream running, you'll see that the stream
>> receives
>> > 10
>> > > >>> messages (with the timestamp incrementing 1 second for each
>> message)
>> > > like
>> > > >>> this:
>> > > >>>
>> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 163000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 164000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 165000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 166000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 167000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 168000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 169000, "production": 1}
>> > > >>>
>> > > >>> and at the 10 seconds interval something like:
>> > > >>>
>> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
>> > > >>> S1 with computed metric {"timestamp": 170000, "production": 10}
>> > > >>> S1 with computed metric {"timestamp": 180000, "production": 10}
>> > > >>>
>> > > >>> and so on...
>> > > >>> Now there are two problems, after stopping and restarting the
>> stream
>> > > >>> processor (by sending SIGINT via IntelliJ since I start the class
>> > main
>> > > >> with
>> > > >>> it) it happens:
>> > > >>>  - sometimes the aggregated count is wrong, if I have it start
>> > > windowing
>> > > >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after
>> restart
>> > > it
>> > > >>> might just emit a value for the new 3 missing seconds (seconds
>> 18-20)
>> > > and
>> > > >>> the aggregated value is 3 not 10
>> > > >>>  - sometimes the window outputs twice, in the example where I
>> restart
>> > > the
>> > > >>> stream processor I might get as output
>> > > >>>
>> > > >>> S1 with filtered metric{"timestamp": 154000, "production": 1}
>> > > >>> S1 with computed metric {"timestamp": 160000, "production": 5}
>> > > >>> S1 with filtered metric{"timestamp": 155000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 156000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 157000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 158000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 159000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 160000, "production": 1}
>> > > >>> S1 with filtered metric{"timestamp": 161000, "production": 1}
>> > > >>> S1 with computed metric {"timestamp": 160000, "production": 10}
>> > > >>> S1 with filtered metric{"timestamp": 162000, "production": 1}
>> > > >>>
>> > > >>> as you can see, window for timestamp 160000 is duplicated
>> > > >>>
>> > > >>> Is this because the window state isn't persisted across restarts?
>> > > >>> My ultimate goal is to have the window part emit only once and
>> resume
>> > > >>> processing across restarts, while avoiding processing out of order
>> > data
>> > > >>> (that's the purpose of the TimestampIncrementalFilter)
>> > > >>>
>> > > >>> Thank you in advance
>> > > >>>
>> > > >>> --
>> > > >>> Alessandro Tagliapietra
>> > > >>>
>> > > >>>
>> > > >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
>> > > >>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>
>> > > >>>> Hi Bruno,
>> > > >>>>
>> > > >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
>> > > >>>> Anyway I'll try to make a small reproduction repo with all the
>> > > >> different
>> > > >>>> cases soon.
>> > > >>>>
>> > > >>>> Thank you
>> > > >>>>
>> > > >>>> --
>> > > >>>> Alessandro Tagliapietra
>> > > >>>>
>> > > >>>>
>> > > >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <
>> bruno@confluent.io>
>> > > >>> wrote:
>> > > >>>>
>> > > >>>>> Hi Alessandro,
>> > > >>>>>
>> > > >>>>> What version of Kafka do you use?
>> > > >>>>>
>> > > >>>>> Could you please give a more detailed example for the issues
>> with
>> > the
>> > > >>> two
>> > > >>>>> keys you see?
>> > > >>>>>
>> > > >>>>> Could the following bug be related to the duplicates you see?
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
>> > > >>>>>
>> > > >>>>> How do you restart the processor?
>> > > >>>>>
>> > > >>>>> Best,
>> > > >>>>> Bruno
>> > > >>>>>
>> > > >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
>> > > >>>>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>>>
>> > > >>>>>> Thank you Bruno,
>> > > >>>>>>
>> > > >>>>>> I'll look into those, however average is just a simple thing
>> I'm
>> > > >>> trying
>> > > >>>>>> right now just to get an initial windowing flow working.
>> > > >>>>>> In the future I'll probably still need the actual values for
>> other
>> > > >>>>>> calculations. We won't have more than 60 elements per window
>> for
>> > > >> sure.
>> > > >>>>>>
>> > > >>>>>> So far to not manually serialize/deserialize the array list
>> I've
>> > > >>>>> created an
>> > > >>>>>> Avro model with an array field containing the values.
>> > > >>>>>> I had issues with suppress as explained here
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
>> > > >>>>>>
>> > > >>>>>> but I got that working.
>> > > >>>>>> So far everything seems to be working, except a couple things:
>> > > >>>>>>  - if I generate data with 1 key, I correctly get a value each
>> 10
>> > > >>>>> seconds,
>> > > >>>>>> if I later start generating data with another key (while key 1
>> is
>> > > >>> still
>> > > >>>>>> generating) the windowing emits a value only after the
>> timestamp
>> > of
>> > > >>> key
>> > > >>>>> 2
>> > > >>>>>> reaches the last generated window
>> > > >>>>>>  - while generating data, if I restart the processor as soon
>> as it
>> > > >>>>> starts
>> > > >>>>>> it sometimes generates 2 aggregates for the same window even if
>> > I'm
>> > > >>>>> using
>> > > >>>>>> the suppress
>> > > >>>>>>
>> > > >>>>>> Anyway, I'll look into your link and try to find out the cause
>> of
>> > > >>> these
>> > > >>>>>> issues, probably starting from scratch with a simpler example
>> > > >>>>>>
>> > > >>>>>> Thank you for your help!
>> > > >>>>>>
>> > > >>>>>> --
>> > > >>>>>> Alessandro Tagliapietra
>> > > >>>>>>
>> > > >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <
>> > bruno@confluent.io>
>> > > >>>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Hi Alessandro,
>> > > >>>>>>>
>> > > >>>>>>> Have a look at this Kafka Usage Pattern for computing averages
>> > > >>> without
>> > > >>>>>>> using an ArrayList.
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
>> > > >>>>>>> ?
>> > > >>>>>>>
>> > > >>>>>>> The advantages of this pattern over the ArrayList approach is
>> the
>> > > >>>>> reduced
>> > > >>>>>>> space needed to compute the aggregate. Note that you will
>> still
>> > > >> need
>> > > >>>>> to
>> > > >>>>>>> implement a SerDe. However, the SerDe should be a bit easier
>> to
>> > > >>>>> implement
>> > > >>>>>>> than a SerDe for an ArrayList.
>> > > >>>>>>>
>> > > >>>>>>> Hope that helps.
>> > > >>>>>>>
>> > > >>>>>>> Best,
>> > > >>>>>>> Bruno
>> > > >>>>>>>
>> > > >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
>> > > >>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>>>>>
>> > > >>>>>>>> Sorry but it seemed harder than I thought,
>> > > >>>>>>>>
>> > > >>>>>>>> to have the custom aggregation working I need to get an
>> > > >> ArrayList
>> > > >>> of
>> > > >>>>>> all
>> > > >>>>>>>> the values in the window, so far my aggregate DSL method
>> creates
>> > > >>> an
>> > > >>>>>>>> ArrayList on the initializer and adds each value to the list
>> in
>> > > >>> the
>> > > >>>>>>>> aggregator.
>> > > >>>>>>>> Then I think I'll have to provide a serder to change the
>> output
>> > > >>>>> type of
>> > > >>>>>>>> that method.
>> > > >>>>>>>> I was looking at
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
>> > > >>>>>>>> but
>> > > >>>>>>>> that seems more towards a list of longs and already uses
>> > > >>> longSerde.
>> > > >>>>>>>> I'm currently trying to implement another avro model that
>> has a
>> > > >>>>> field
>> > > >>>>>> of
>> > > >>>>>>>> type array so I can use the regular avro serializer to
>> implement
>> > > >>>>> this.
>> > > >>>>>>>> Should I create my own serdes instead or is this the right
>> way?
>> > > >>>>>>>>
>> > > >>>>>>>> Thank you in advance
>> > > >>>>>>>>
>> > > >>>>>>>> --
>> > > >>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>
>> > > >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
>> > > >>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>>>>>>
>> > > >>>>>>>>> Thank you Bruno and Matthias,
>> > > >>>>>>>>>
>> > > >>>>>>>>> I've modified the transformer to implement the
>> > > >>>>>> ValueTransformerWithKey
>> > > >>>>>>>>> interface and everything is working fine.
>> > > >>>>>>>>> I've now to window the data and manually aggregate each
>> window
>> > > >>>>> data
>> > > >>>>>>> since
>> > > >>>>>>>>> I've to do some averages and sum of differences.
>> > > >>>>>>>>> So far I've just having some issues with message types since
>> > > >> I'm
>> > > >>>>>>> changing
>> > > >>>>>>>>> the data type when aggregating the window but I think it's
>> an
>> > > >>> easy
>> > > >>>>>>>> problem.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thank you again
>> > > >>>>>>>>> Best
>> > > >>>>>>>>>
>> > > >>>>>>>>> --
>> > > >>>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>>
>> > > >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
>> > > >>>>> bruno@confluent.io>
>> > > >>>>>>>> wrote:
>> > > >>>>>>>>>
>> > > >>>>>>>>>> Hi Alessandro,
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> the `TransformSupplier` is internally wrapped with a
>> > > >>>>>>>> `ProcessorSupplier`,
>> > > >>>>>>>>>> so the statement
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> `transform` is essentially equivalent to adding the
>> > > >> Transformer
>> > > >>>>> via
>> > > >>>>>>>>>> Topology#addProcessor() to your processor topology
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> is correct.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> If you do not change the key, you should definitely use one
>> > > >> of
>> > > >>>>> the
>> > > >>>>>>>>>> overloads of `transformValues` to avoid internal data
>> > > >>>>>> redistribution.
>> > > >>>>>>> In
>> > > >>>>>>>>>> your case the overload with
>> `ValueTransformerWithKeySupplier`
>> > > >>> as
>> > > >>>>>>>> suggested
>> > > >>>>>>>>>> by Matthias would fit.
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Best,
>> > > >>>>>>>>>> Bruno
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
>> > > >>>>>>> matthias@confluent.io
>> > > >>>>>>>>>
>> > > >>>>>>>>>> wrote:
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
>> > > >>>>> read-only
>> > > >>>>>>> acess
>> > > >>>>>>>>>>> to the key.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> -Matthias
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
>> > > >>>>>>>>>>>> Hi Bruno,
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Thank you for the quick answer.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> I'm actually trying to do that since it seems there is
>> > > >>>>> really no
>> > > >>>>>>> way
>> > > >>>>>>>>>> to
>> > > >>>>>>>>>>>> have it use `Processor<K, V>`.
>> > > >>>>>>>>>>>> I just wanted (if that would've made any sense) to use
>> > > >> the
>> > > >>>>>>> Processor
>> > > >>>>>>>>>> in
>> > > >>>>>>>>>>>> both DSL and non-DSL pipelines.
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can
>> > > >>>>> use it
>> > > >>>>>>> as
>> > > >>>>>>>> I
>> > > >>>>>>>>>>> need
>> > > >>>>>>>>>>>> the message key since that is the discriminating value
>> > > >> for
>> > > >>>>> the
>> > > >>>>>>>> filter
>> > > >>>>>>>>>> (I
>> > > >>>>>>>>>>>> want to exclude old values per sensor ID so per message
>> > > >>> key)
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Right now I've this
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
>> > > >>>>>>>>>>>> and
>> > > >>>>>>>>>>>> i'm using it with `transform()` .
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> One thing I've found confusing is this
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> transform is essentially equivalent to adding the
>> > > >>> Transformer
>> > > >>>>>> via
>> > > >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
>> > > >>>>>>>>>>>>> <
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>> .
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while
>> > > >>>>>>>>>> `addProcessor`
>> > > >>>>>>>>>>>> uses a ProcessorSupplier?
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> Thank you again for your help
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> --
>> > > >>>>>>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
>> > > >>>>>> bruno@confluent.io
>> > > >>>>>>>>
>> > > >>>>>>>>>>> wrote:
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hi Alessandro,
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Have you considered using `transform()` (actually in
>> > > >> your
>> > > >>>>> case
>> > > >>>>>>> you
>> > > >>>>>>>>>>> should
>> > > >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
>> > > >>>>> `transform()`
>> > > >>>>>>> and
>> > > >>>>>>>>>>>>> `transformValues()` are stateful operations similar to
>> > > >>>>>> `.process`
>> > > >>>>>>>> but
>> > > >>>>>>>>>>> they
>> > > >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a
>> > > >>>>>> windowed
>> > > >>>>>>>>>>>>> aggregation.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Hope that helps.
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> Best,
>> > > >>>>>>>>>>>>> Bruno
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
>> > > >> <
>> > > >>>>>>>>>>>>> tagliapietra.alessandro@gmail.com> wrote:
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Hi there,
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a
>> > > >>>>> stream
>> > > >>>>>>>>>>> processor
>> > > >>>>>>>>>>>>>> that in multiple stages:
>> > > >>>>>>>>>>>>>>  - filters messages using a kv store so that only
>> > > >>> messages
>> > > >>>>>> with
>> > > >>>>>>>>>> higher
>> > > >>>>>>>>>>>>>> timestamp gets processed
>> > > >>>>>>>>>>>>>>  - aggregates the message metrics by minute giving e.g.
>> > > >>> the
>> > > >>>>>> avg
>> > > >>>>>>> of
>> > > >>>>>>>>>>> those
>> > > >>>>>>>>>>>>>> metrics in that minute
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the
>> > > >>>>> value
>> > > >>>>>> is
>> > > >>>>>>>>>> e.g. {
>> > > >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> I've started by creating a processor to use the kv
>> > > >> store
>> > > >>>>> and
>> > > >>>>>>>> filter
>> > > >>>>>>>>>> old
>> > > >>>>>>>>>>>>>> messages:
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
>> > > >> nice
>> > > >>>>>>>> windowing
>> > > >>>>>>>>>>>>>> examples for the DSL but none for the Processor API
>> > > >>> (only a
>> > > >>>>>>> small
>> > > >>>>>>>>>>>>> reference
>> > > >>>>>>>>>>>>>> to the windowed store), can someone point me in the
>> > > >> right
>> > > >>>>>>>> direction?
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to
>> > > >>> use
>> > > >>>>>> the
>> > > >>>>>>>> DSL
>> > > >>>>>>>>>> but
>> > > >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw
>> > > >>> this
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
>> > > >>>>>>>>>>>>>> but
>> > > >>>>>>>>>>>>>> it explains mostly transformers not processors. I also
>> > > >>> saw
>> > > >>>>>> after
>> > > >>>>>>>>>> that
>> > > >>>>>>>>>>> the
>> > > >>>>>>>>>>>>>> example usage of the processor but `.process(...)`
>> > > >>> returns
>> > > >>>>>> void,
>> > > >>>>>>>> so
>> > > >>>>>>>>>> I
>> > > >>>>>>>>>>>>>> cannot have a KStream from a processor?
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> Thank you all in advance
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>> --
>> > > >>>>>>>>>>>>>> Alessandro Tagliapietra
>> > > >>>>>>>>>>>>>>
>> > > >>>>>>>>>>>>>
>> > > >>>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > >
>> > >
>> > >
>> >
>>
>