You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mikael Högqvist <ho...@gmail.com> on 2016/11/22 09:59:18 UTC

KafkaStreams KTable#through not creating changelog topic

Hi,

in the documentation for KTable#through, it is stated that a new changelog
topic will be created for the table. It also states that calling through is
equivalent to calling #to followed by KStreamBuilder#table.

http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)

In the docs for KStreamBuilder#table it is stated that no new changelog
topic will be created since the underlying topic acts as the changelog.
I've verified that this is the case.

Is there another API method to materialize the results of a KTable
including a changelog, i.e. such that kafka streams creates the topic and
uses the naming schema for changelog topics? The use case I have in mind is
aggregate followed by mapValues.

Best,
Mikael

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Eno Thereska <en...@gmail.com>.
Hi Mikael,

If you perform an aggregate and thus create another KTable, that KTable will have a changelog topic (and a state store that you can query with Interactive Queris - but this is tangential). It is true that source KTables don't need to create another topic, since they already have one.

I'm not 100% sure if I understood your question correctly. Are you observing something that does not match your expectations?

Thanks
Eno

> On 22 Nov 2016, at 09:59, Mikael Högqvist <ho...@gmail.com> wrote:
> 
> Hi,
> 
> in the documentation for KTable#through, it is stated that a new changelog
> topic will be created for the table. It also states that calling through is
> equivalent to calling #to followed by KStreamBuilder#table.
> 
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> 
> In the docs for KStreamBuilder#table it is stated that no new changelog
> topic will be created since the underlying topic acts as the changelog.
> I've verified that this is the case.
> 
> Is there another API method to materialize the results of a KTable
> including a changelog, i.e. such that kafka streams creates the topic and
> uses the naming schema for changelog topics? The use case I have in mind is
> aggregate followed by mapValues.
> 
> Best,
> Mikael


Re: KafkaStreams KTable#through not creating changelog topic

Posted by Mikael Högqvist <ho...@gmail.com>.
Thanks, based on this we will re-evaluate the use of internal topics. The
main motivation for using the internal changelog topics was to avoid
duplication of data and have an easy way to access the update stream of any
state store.

Best,
Mikael

On Fri, Nov 25, 2016 at 9:52 AM Michael Noll <mi...@confluent.io> wrote:

> Mikael,
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
>
> Yes, that (i.e. you are running with auto-topic creation enabled) was what
> I implicitly understood.  As covered in [1] we strongly recommend to
> manually pre-create/manage user topics though.  User topics include the
> source topics that you are reading from (cf. `stream()`, `table()`) but
> also include the topics you use in `through()` and `to()`.
>
>
> > Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
>
> There is already work being done in the Admin API (KIP-4), and part of this
> functionality was released in the latest Kafka versions.  You can use this
> to programmatically create topics, for example.  Note though that the work
> on KIP-4 is not fully completed yet.
>
> -Michael
>
>
>
>
> [1]
>
> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>
>
>
> On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist <ho...@gmail.com>
> wrote:
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
> >
> > That said, as a user, I think it would be great with a function in the
> > Kafka Streams DSL that would allow me to materialize a KTable without
> > pre-creating the topic. Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
> >
> > Best,
> > Mikael
> >
> > On Thu, Nov 24, 2016 at 1:44 PM Damian Guy <da...@gmail.com> wrote:
> >
> > Mikeal,
> >
> > When you use `through(..)` topics are not created by KafkaStreams. You
> need
> > to create them yourself before you run the application.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <ho...@gmail.com>
> wrote:
> >
> > > Yes, the naming is not an issue.
> > >
> > > I've tested this with the topology described earlier. Every time I
> start
> > > the topology with a call to .through() that references a topic that
> does
> > > not exist, I get an exception from the UncaughtExceptionHandler:
> > >
> > > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> > Topic
> > > not found during partition assignment: words-count-changelog
> > >
> > > This happens when .through("words-count-changelog", "count") is part of
> > the
> > > topology. The topology is also not forwarding anything to that
> > topic/store.
> > > After restarting the application it works fine.
> > >
> > > Are the changelog topics created via, for example, .aggregate()
> different
> > > to topics auto created via .through()?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > > > > 1) Create a state store AND the changelog
> > > > > topic 2) follow the Kafka Streams naming convention for changelog
> > > topics.
> > > > > Basically, I want to have a method that does what .through() is
> > > supposed
> > > > to
> > > > > do according to the documentation, but without the "topic"
> parameter.
> > > >
> > > > I understand what you are saying, but you can get this done right
> now,
> > > > too. If you use through(...) you will get the store. And you can just
> > > > specify the topic name as "applicationId-storeName-changelog" to
> > follow
> > > > the naming convention Streams used internally. What is the problem
> > using
> > > > this approach (besides that you have to provide the topic name which
> > > > seems not to be a big burden to me?)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > > > Hi Michael,
> > > > >
> > > > > thanks for the extensive explanation, and yes it definitely helps
> > with
> > > my
> > > > > understanding of through(). :)
> > > > >
> > > > > You guessed correctly that I'm doing some "shenanings" where I'm
> > trying
> > > > to
> > > > > derive the changelog of a state store from the state store name.
> This
> > > > works
> > > > > perfectly fine with with a naming convention for the topics and by
> > > > creating
> > > > > them in Kafka upfront.
> > > > >
> > > > > My point is that it would help me (and maybe others), if the API of
> > > > KTable
> > > > > was extended to have a new method that does two things that is not
> > part
> > > > of
> > > > > the implementation of .through(). 1) Create a state store AND the
> > > > changelog
> > > > > topic 2) follow the Kafka Streams naming convention for changelog
> > > topics.
> > > > > Basically, I want to have a method that does what .through() is
> > > supposed
> > > > to
> > > > > do according to the documentation, but without the "topic"
> parameter.
> > > > >
> > > > > What do you think, would it be possible to extend the API with a
> > method
> > > > > like that?
> > > > >
> > > > > Thanks,
> > > > > Mikael
> > > > >
> > > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <michael@confluent.io
> >
> > > > wrote:
> > > > >
> > > > >> Mikael,
> > > > >>
> > > > >> regarding your second question:
> > > > >>
> > > > >>> 2) Regarding the use case, the topology looks like this:
> > > > >>>
> > > > >>> .stream(...)
> > > > >>> .aggregate(..., "store-1")
> > > > >>> .mapValues(...)
> > > > >>> .through(..., "store-2")
> > > > >>
> > > > >> The last operator above would, without "..." ellipsis, be sth like
> > > > >> `KTable#through("through-topic", "store-2")`.  Here,
> > "through-topic"
> > > is
> > > > the
> > > > >> changelog topic for both the KTable and the state store "store-2".
> > So
> > > > this
> > > > >> is the changelog topic name that you want to know.
> > > > >>
> > > > >> - If you want the "through" topic to have a `-changelog` suffix,
> > then
> > > > you'd
> > > > >> need to add that yourself in the call to `through(...)`.
> > > > >>
> > > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > > > >> automatically:  That's because `through()` -- like `to()` or
> > > `stream()`,
> > > > >> `table()` -- require you to explicitly provide a topic name, and
> of
> > > > course
> > > > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix
> is
> > > > only
> > > > >> added when Kafka creates internal changelog topics behind the
> scenes
> > > for
> > > > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
> > incorrect
> > > > >> because it refers to `-changelog`;  we'll fix that as mentioned
> > above.
> > > > >>
> > > > >> - Also, in case you want to do some shenanigans (like for some
> > tooling
> > > > >> you're building around state stores/changelogs/interactive
> queries)
> > > such
> > > > >> detecting all state store changelogs by doing the equivalent of
> `ls
> > > > >> *-changelog`, then this will miss changelogs of KTables that are
> > > > created by
> > > > >> `through()` and `to()` (unless you come up with a naming
> convention
> > > that
> > > > >> your tooling can assume to be in place, e.g. by always adding
> > > > `-changelog`
> > > > >> to topic names when you call `through()`).
> > > > >>
> > > > >> I hope this helps!
> > > > >> Michael
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <
> > hoegqvist@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Eno,
> > > > >>>
> > > > >>> 1) Great :)
> > > > >>>
> > > > >>> 2) Yes, we are using the Interactive Queries to access the state
> > > > stores.
> > > > >> In
> > > > >>> addition, we access the changelogs to subscribe to updates. For
> > this
> > > > >> reason
> > > > >>> we need to know the changelog topic name.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Mikael
> > > > >>>
> > > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <
> > eno.thereska@gmail.com
> > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> HI Mikael,
> > > > >>>>
> > > > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias
> is
> > > > >> looking
> > > > >>>> into fixing it. I agree that it can be confusing to have topic
> > names
> > > > >> that
> > > > >>>> are not what one would expect.
> > > > >>>>
> > > > >>>> 2) If your goal is to query/read from the state stores, you can
> > use
> > > > >>>> Interactive Queries to do that (you don't need to worry about
> the
> > > > >>> changelog
> > > > >>>> topic name and such). Interactive Queries is a new feature in
> > 0.10.1
> > > > >>> (blog
> > > > >>>> here:
> > > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > > >>> and-interactive-queries-in-apache-kafka/
> > > > >>>> <
> > > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > > >>> and-interactive-queries-in-apache-kafka/
> > > > >>>>> ).
> > > > >>>>
> > > > >>>> Thanks
> > > > >>>> Eno
> > > > >>>>
> > > > >>>>
> > > > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqvist@gmail.com
> >
> > > > >> wrote:
> > > > >>>>>
> > > > >>>>> Sorry for being unclear, i'll try again :)
> > > > >>>>>
> > > > >>>>> 1) The JavaDoc for through is not correct, it states that a
> > > changelog
> > > > >>>> topic
> > > > >>>>> will be created for the state store. That is, if I would call
> it
> > > with
> > > > >>>>> through("topic", "a-store"), I would expect a kafka topic
> > > > >>>>> "my-app-id-a-store-changelog" to be created.
> > > > >>>>>
> > > > >>>>> 2) Regarding the use case, the topology looks like this:
> > > > >>>>>
> > > > >>>>> .stream(...)
> > > > >>>>> .aggregate(..., "store-1")
> > > > >>>>> .mapValues(...)
> > > > >>>>> .through(..., "store-2")
> > > > >>>>>
> > > > >>>>> Basically, I want to materialize both the result from the
> > aggregate
> > > > >>>> method
> > > > >>>>> and the result from mapValues, which is materialized using
> > > > >> .through().
> > > > >>>>> Later, I will access both the tables (store-1 and store-2) to
> a)
> > > get
> > > > >>> the
> > > > >>>>> current state of the aggregate, b) subscribe to future updates.
> > > This
> > > > >>>> works
> > > > >>>>> just fine. The only issue is that I assumed to have a changelog
> > > topic
> > > > >>> for
> > > > >>>>> store-2 created automatically, which didnt happen.
> > > > >>>>>
> > > > >>>>> Since I want to access the changelog topic, it helps if the
> > naming
> > > is
> > > > >>>>> consistent. So either we enforce the same naming pattern as
> kafka
> > > > >> when
> > > > >>>>> calling .through() or alternatively the Kafka Streams API can
> > > > >> provide a
> > > > >>>>> method to materialize tables which creates a topic name
> according
> > > to
> > > > >>> the
> > > > >>>>> naming pattern. E.g. .through() without the topic parameter.
> > > > >>>>>
> > > > >>>>> What do you think?
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Mikael
> > > > >>>>>
> > > > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> > > > >> matthias@confluent.io
> > > > >>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> I cannot completely follow what want to achieve.
> > > > >>>>>>
> > > > >>>>>> However, the JavaDoc for through() seems not to be correct to
> > me.
> > > > >>> Using
> > > > >>>>>> through() will not create an extra internal changelog topic
> with
> > > the
> > > > >>>>>> described naming schema, because the topic specified in
> > through()
> > > > >> can
> > > > >>> be
> > > > >>>>>> used for this (there is no point in duplicating the data).
> > > > >>>>>>
> > > > >>>>>> If you have a KTable and apply a mapValues(), this will not
> > write
> > > > >> data
> > > > >>>>>> to any topic. The derived KTable is in-memory because you can
> > > easily
> > > > >>>>>> recreate it from its base KTable.
> > > > >>>>>>
> > > > >>>>>> What is the missing part you want to get?
> > > > >>>>>>
> > > > >>>>>> Btw: the internally created changelog topics are only used for
> > > > >>> recovery
> > > > >>>>>> in case of failure. Streams does not consumer from those topic
> > > > >> during
> > > > >>>>>> "normal operation".
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> -Matthias
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > > > >>>>>>> Hi,
> > > > >>>>>>>
> > > > >>>>>>> in the documentation for KTable#through, it is stated that a
> > new
> > > > >>>>>> changelog
> > > > >>>>>>> topic will be created for the table. It also states that
> > calling
> > > > >>>> through
> > > > >>>>>> is
> > > > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > > > >>> streams/kstream/KTable.html#through(org.apache.kafka.
> > > > >>> common.serialization.Serde,%20org.apache.kafka.common.
> > > > >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > > > >>>>>>>
> > > > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new
> > > > >>> changelog
> > > > >>>>>>> topic will be created since the underlying topic acts as the
> > > > >>> changelog.
> > > > >>>>>>> I've verified that this is the case.
> > > > >>>>>>>
> > > > >>>>>>> Is there another API method to materialize the results of a
> > > KTable
> > > > >>>>>>> including a changelog, i.e. such that kafka streams creates
> the
> > > > >> topic
> > > > >>>> and
> > > > >>>>>>> uses the naming schema for changelog topics? The use case I
> > have
> > > in
> > > > >>>> mind
> > > > >>>>>> is
> > > > >>>>>>> aggregate followed by mapValues.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Mikael
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Michael Noll <mi...@confluent.io>.
Mikael,

> Sure, I guess the topic is auto-created the first time I start the
topology
> and the second time its there already. It could be possible to create
> topics up front for us, or even use an admin call from inside the code.

Yes, that (i.e. you are running with auto-topic creation enabled) was what
I implicitly understood.  As covered in [1] we strongly recommend to
manually pre-create/manage user topics though.  User topics include the
source topics that you are reading from (cf. `stream()`, `table()`) but
also include the topics you use in `through()` and `to()`.


> Today there is .through(topic, store) and
> .to(topic), maybe it would be possible to have something like
> .materialize(store) which takes care of topic creation? Would adding
> something like this require a KIP?

There is already work being done in the Admin API (KIP-4), and part of this
functionality was released in the latest Kafka versions.  You can use this
to programmatically create topics, for example.  Note though that the work
on KIP-4 is not fully completed yet.

-Michael




[1]
http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application



On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist <ho...@gmail.com>
wrote:

> Sure, I guess the topic is auto-created the first time I start the topology
> and the second time its there already. It could be possible to create
> topics up front for us, or even use an admin call from inside the code.
>
> That said, as a user, I think it would be great with a function in the
> Kafka Streams DSL that would allow me to materialize a KTable without
> pre-creating the topic. Today there is .through(topic, store) and
> .to(topic), maybe it would be possible to have something like
> .materialize(store) which takes care of topic creation? Would adding
> something like this require a KIP?
>
> Best,
> Mikael
>
> On Thu, Nov 24, 2016 at 1:44 PM Damian Guy <da...@gmail.com> wrote:
>
> Mikeal,
>
> When you use `through(..)` topics are not created by KafkaStreams. You need
> to create them yourself before you run the application.
>
> Thanks,
> Damian
>
> On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <ho...@gmail.com> wrote:
>
> > Yes, the naming is not an issue.
> >
> > I've tested this with the topology described earlier. Every time I start
> > the topology with a call to .through() that references a topic that does
> > not exist, I get an exception from the UncaughtExceptionHandler:
> >
> > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> Topic
> > not found during partition assignment: words-count-changelog
> >
> > This happens when .through("words-count-changelog", "count") is part of
> the
> > topology. The topology is also not forwarding anything to that
> topic/store.
> > After restarting the application it works fine.
> >
> > Are the changelog topics created via, for example, .aggregate() different
> > to topics auto created via .through()?
> >
> > Thanks,
> > Mikael
> >
> > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > > 1) Create a state store AND the changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > >
> > > I understand what you are saying, but you can get this done right now,
> > > too. If you use through(...) you will get the store. And you can just
> > > specify the topic name as "applicationId-storeName-changelog" to
> follow
> > > the naming convention Streams used internally. What is the problem
> using
> > > this approach (besides that you have to provide the topic name which
> > > seems not to be a big burden to me?)
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > > Hi Michael,
> > > >
> > > > thanks for the extensive explanation, and yes it definitely helps
> with
> > my
> > > > understanding of through(). :)
> > > >
> > > > You guessed correctly that I'm doing some "shenanings" where I'm
> trying
> > > to
> > > > derive the changelog of a state store from the state store name. This
> > > works
> > > > perfectly fine with with a naming convention for the topics and by
> > > creating
> > > > them in Kafka upfront.
> > > >
> > > > My point is that it would help me (and maybe others), if the API of
> > > KTable
> > > > was extended to have a new method that does two things that is not
> part
> > > of
> > > > the implementation of .through(). 1) Create a state store AND the
> > > changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > > >
> > > > What do you think, would it be possible to extend the API with a
> method
> > > > like that?
> > > >
> > > > Thanks,
> > > > Mikael
> > > >
> > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mi...@confluent.io>
> > > wrote:
> > > >
> > > >> Mikael,
> > > >>
> > > >> regarding your second question:
> > > >>
> > > >>> 2) Regarding the use case, the topology looks like this:
> > > >>>
> > > >>> .stream(...)
> > > >>> .aggregate(..., "store-1")
> > > >>> .mapValues(...)
> > > >>> .through(..., "store-2")
> > > >>
> > > >> The last operator above would, without "..." ellipsis, be sth like
> > > >> `KTable#through("through-topic", "store-2")`.  Here,
> "through-topic"
> > is
> > > the
> > > >> changelog topic for both the KTable and the state store "store-2".
> So
> > > this
> > > >> is the changelog topic name that you want to know.
> > > >>
> > > >> - If you want the "through" topic to have a `-changelog` suffix,
> then
> > > you'd
> > > >> need to add that yourself in the call to `through(...)`.
> > > >>
> > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > > >> automatically:  That's because `through()` -- like `to()` or
> > `stream()`,
> > > >> `table()` -- require you to explicitly provide a topic name, and of
> > > course
> > > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> > > only
> > > >> added when Kafka creates internal changelog topics behind the scenes
> > for
> > > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
> incorrect
> > > >> because it refers to `-changelog`;  we'll fix that as mentioned
> above.
> > > >>
> > > >> - Also, in case you want to do some shenanigans (like for some
> tooling
> > > >> you're building around state stores/changelogs/interactive queries)
> > such
> > > >> detecting all state store changelogs by doing the equivalent of `ls
> > > >> *-changelog`, then this will miss changelogs of KTables that are
> > > created by
> > > >> `through()` and `to()` (unless you come up with a naming convention
> > that
> > > >> your tooling can assume to be in place, e.g. by always adding
> > > `-changelog`
> > > >> to topic names when you call `through()`).
> > > >>
> > > >> I hope this helps!
> > > >> Michael
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <
> hoegqvist@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Eno,
> > > >>>
> > > >>> 1) Great :)
> > > >>>
> > > >>> 2) Yes, we are using the Interactive Queries to access the state
> > > stores.
> > > >> In
> > > >>> addition, we access the changelogs to subscribe to updates. For
> this
> > > >> reason
> > > >>> we need to know the changelog topic name.
> > > >>>
> > > >>> Thanks,
> > > >>> Mikael
> > > >>>
> > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <
> eno.thereska@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> HI Mikael,
> > > >>>>
> > > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> > > >> looking
> > > >>>> into fixing it. I agree that it can be confusing to have topic
> names
> > > >> that
> > > >>>> are not what one would expect.
> > > >>>>
> > > >>>> 2) If your goal is to query/read from the state stores, you can
> use
> > > >>>> Interactive Queries to do that (you don't need to worry about the
> > > >>> changelog
> > > >>>> topic name and such). Interactive Queries is a new feature in
> 0.10.1
> > > >>> (blog
> > > >>>> here:
> > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > >>> and-interactive-queries-in-apache-kafka/
> > > >>>> <
> > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > >>> and-interactive-queries-in-apache-kafka/
> > > >>>>> ).
> > > >>>>
> > > >>>> Thanks
> > > >>>> Eno
> > > >>>>
> > > >>>>
> > > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
> > > >> wrote:
> > > >>>>>
> > > >>>>> Sorry for being unclear, i'll try again :)
> > > >>>>>
> > > >>>>> 1) The JavaDoc for through is not correct, it states that a
> > changelog
> > > >>>> topic
> > > >>>>> will be created for the state store. That is, if I would call it
> > with
> > > >>>>> through("topic", "a-store"), I would expect a kafka topic
> > > >>>>> "my-app-id-a-store-changelog" to be created.
> > > >>>>>
> > > >>>>> 2) Regarding the use case, the topology looks like this:
> > > >>>>>
> > > >>>>> .stream(...)
> > > >>>>> .aggregate(..., "store-1")
> > > >>>>> .mapValues(...)
> > > >>>>> .through(..., "store-2")
> > > >>>>>
> > > >>>>> Basically, I want to materialize both the result from the
> aggregate
> > > >>>> method
> > > >>>>> and the result from mapValues, which is materialized using
> > > >> .through().
> > > >>>>> Later, I will access both the tables (store-1 and store-2) to a)
> > get
> > > >>> the
> > > >>>>> current state of the aggregate, b) subscribe to future updates.
> > This
> > > >>>> works
> > > >>>>> just fine. The only issue is that I assumed to have a changelog
> > topic
> > > >>> for
> > > >>>>> store-2 created automatically, which didnt happen.
> > > >>>>>
> > > >>>>> Since I want to access the changelog topic, it helps if the
> naming
> > is
> > > >>>>> consistent. So either we enforce the same naming pattern as kafka
> > > >> when
> > > >>>>> calling .through() or alternatively the Kafka Streams API can
> > > >> provide a
> > > >>>>> method to materialize tables which creates a topic name according
> > to
> > > >>> the
> > > >>>>> naming pattern. E.g. .through() without the topic parameter.
> > > >>>>>
> > > >>>>> What do you think?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Mikael
> > > >>>>>
> > > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> > > >> matthias@confluent.io
> > > >>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> I cannot completely follow what want to achieve.
> > > >>>>>>
> > > >>>>>> However, the JavaDoc for through() seems not to be correct to
> me.
> > > >>> Using
> > > >>>>>> through() will not create an extra internal changelog topic with
> > the
> > > >>>>>> described naming schema, because the topic specified in
> through()
> > > >> can
> > > >>> be
> > > >>>>>> used for this (there is no point in duplicating the data).
> > > >>>>>>
> > > >>>>>> If you have a KTable and apply a mapValues(), this will not
> write
> > > >> data
> > > >>>>>> to any topic. The derived KTable is in-memory because you can
> > easily
> > > >>>>>> recreate it from its base KTable.
> > > >>>>>>
> > > >>>>>> What is the missing part you want to get?
> > > >>>>>>
> > > >>>>>> Btw: the internally created changelog topics are only used for
> > > >>> recovery
> > > >>>>>> in case of failure. Streams does not consumer from those topic
> > > >> during
> > > >>>>>> "normal operation".
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> -Matthias
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> in the documentation for KTable#through, it is stated that a
> new
> > > >>>>>> changelog
> > > >>>>>>> topic will be created for the table. It also states that
> calling
> > > >>>> through
> > > >>>>>> is
> > > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > > >>> streams/kstream/KTable.html#through(org.apache.kafka.
> > > >>> common.serialization.Serde,%20org.apache.kafka.common.
> > > >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > > >>>>>>>
> > > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new
> > > >>> changelog
> > > >>>>>>> topic will be created since the underlying topic acts as the
> > > >>> changelog.
> > > >>>>>>> I've verified that this is the case.
> > > >>>>>>>
> > > >>>>>>> Is there another API method to materialize the results of a
> > KTable
> > > >>>>>>> including a changelog, i.e. such that kafka streams creates the
> > > >> topic
> > > >>>> and
> > > >>>>>>> uses the naming schema for changelog topics? The use case I
> have
> > in
> > > >>>> mind
> > > >>>>>> is
> > > >>>>>>> aggregate followed by mapValues.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Mikael
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Mikael Högqvist <ho...@gmail.com>.
Sure, I guess the topic is auto-created the first time I start the topology
and the second time its there already. It could be possible to create
topics up front for us, or even use an admin call from inside the code.

That said, as a user, I think it would be great with a function in the
Kafka Streams DSL that would allow me to materialize a KTable without
pre-creating the topic. Today there is .through(topic, store) and
.to(topic), maybe it would be possible to have something like
.materialize(store) which takes care of topic creation? Would adding
something like this require a KIP?

Best,
Mikael

On Thu, Nov 24, 2016 at 1:44 PM Damian Guy <da...@gmail.com> wrote:

Mikeal,

When you use `through(..)` topics are not created by KafkaStreams. You need
to create them yourself before you run the application.

Thanks,
Damian

On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <ho...@gmail.com> wrote:

> Yes, the naming is not an issue.
>
> I've tested this with the topology described earlier. Every time I start
> the topology with a call to .through() that references a topic that does
> not exist, I get an exception from the UncaughtExceptionHandler:
>
> Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic
> not found during partition assignment: words-count-changelog
>
> This happens when .through("words-count-changelog", "count") is part of
the
> topology. The topology is also not forwarding anything to that
topic/store.
> After restarting the application it works fine.
>
> Are the changelog topics created via, for example, .aggregate() different
> to topics auto created via .through()?
>
> Thanks,
> Mikael
>
> On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > > 1) Create a state store AND the changelog
> > > topic 2) follow the Kafka Streams naming convention for changelog
> topics.
> > > Basically, I want to have a method that does what .through() is
> supposed
> > to
> > > do according to the documentation, but without the "topic" parameter.
> >
> > I understand what you are saying, but you can get this done right now,
> > too. If you use through(...) you will get the store. And you can just
> > specify the topic name as "applicationId-storeName-changelog" to follow
> > the naming convention Streams used internally. What is the problem using
> > this approach (besides that you have to provide the topic name which
> > seems not to be a big burden to me?)
> >
> >
> > -Matthias
> >
> >
> > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > Hi Michael,
> > >
> > > thanks for the extensive explanation, and yes it definitely helps with
> my
> > > understanding of through(). :)
> > >
> > > You guessed correctly that I'm doing some "shenanings" where I'm
trying
> > to
> > > derive the changelog of a state store from the state store name. This
> > works
> > > perfectly fine with with a naming convention for the topics and by
> > creating
> > > them in Kafka upfront.
> > >
> > > My point is that it would help me (and maybe others), if the API of
> > KTable
> > > was extended to have a new method that does two things that is not
part
> > of
> > > the implementation of .through(). 1) Create a state store AND the
> > changelog
> > > topic 2) follow the Kafka Streams naming convention for changelog
> topics.
> > > Basically, I want to have a method that does what .through() is
> supposed
> > to
> > > do according to the documentation, but without the "topic" parameter.
> > >
> > > What do you think, would it be possible to extend the API with a
method
> > > like that?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mi...@confluent.io>
> > wrote:
> > >
> > >> Mikael,
> > >>
> > >> regarding your second question:
> > >>
> > >>> 2) Regarding the use case, the topology looks like this:
> > >>>
> > >>> .stream(...)
> > >>> .aggregate(..., "store-1")
> > >>> .mapValues(...)
> > >>> .through(..., "store-2")
> > >>
> > >> The last operator above would, without "..." ellipsis, be sth like
> > >> `KTable#through("through-topic", "store-2")`.  Here, "through-topic"
> is
> > the
> > >> changelog topic for both the KTable and the state store "store-2".
So
> > this
> > >> is the changelog topic name that you want to know.
> > >>
> > >> - If you want the "through" topic to have a `-changelog` suffix, then
> > you'd
> > >> need to add that yourself in the call to `through(...)`.
> > >>
> > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > >> automatically:  That's because `through()` -- like `to()` or
> `stream()`,
> > >> `table()` -- require you to explicitly provide a topic name, and of
> > course
> > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> > only
> > >> added when Kafka creates internal changelog topics behind the scenes
> for
> > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
incorrect
> > >> because it refers to `-changelog`;  we'll fix that as mentioned
above.
> > >>
> > >> - Also, in case you want to do some shenanigans (like for some
tooling
> > >> you're building around state stores/changelogs/interactive queries)
> such
> > >> detecting all state store changelogs by doing the equivalent of `ls
> > >> *-changelog`, then this will miss changelogs of KTables that are
> > created by
> > >> `through()` and `to()` (unless you come up with a naming convention
> that
> > >> your tooling can assume to be in place, e.g. by always adding
> > `-changelog`
> > >> to topic names when you call `through()`).
> > >>
> > >> I hope this helps!
> > >> Michael
> > >>
> > >>
> > >>
> > >>
> > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqvist@gmail.com
> >
> > >> wrote:
> > >>
> > >>> Hi Eno,
> > >>>
> > >>> 1) Great :)
> > >>>
> > >>> 2) Yes, we are using the Interactive Queries to access the state
> > stores.
> > >> In
> > >>> addition, we access the changelogs to subscribe to updates. For this
> > >> reason
> > >>> we need to know the changelog topic name.
> > >>>
> > >>> Thanks,
> > >>> Mikael
> > >>>
> > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.thereska@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> HI Mikael,
> > >>>>
> > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> > >> looking
> > >>>> into fixing it. I agree that it can be confusing to have topic
names
> > >> that
> > >>>> are not what one would expect.
> > >>>>
> > >>>> 2) If your goal is to query/read from the state stores, you can use
> > >>>> Interactive Queries to do that (you don't need to worry about the
> > >>> changelog
> > >>>> topic name and such). Interactive Queries is a new feature in
0.10.1
> > >>> (blog
> > >>>> here:
> > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > >>> and-interactive-queries-in-apache-kafka/
> > >>>> <
> > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > >>> and-interactive-queries-in-apache-kafka/
> > >>>>> ).
> > >>>>
> > >>>> Thanks
> > >>>> Eno
> > >>>>
> > >>>>
> > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
> > >> wrote:
> > >>>>>
> > >>>>> Sorry for being unclear, i'll try again :)
> > >>>>>
> > >>>>> 1) The JavaDoc for through is not correct, it states that a
> changelog
> > >>>> topic
> > >>>>> will be created for the state store. That is, if I would call it
> with
> > >>>>> through("topic", "a-store"), I would expect a kafka topic
> > >>>>> "my-app-id-a-store-changelog" to be created.
> > >>>>>
> > >>>>> 2) Regarding the use case, the topology looks like this:
> > >>>>>
> > >>>>> .stream(...)
> > >>>>> .aggregate(..., "store-1")
> > >>>>> .mapValues(...)
> > >>>>> .through(..., "store-2")
> > >>>>>
> > >>>>> Basically, I want to materialize both the result from the
aggregate
> > >>>> method
> > >>>>> and the result from mapValues, which is materialized using
> > >> .through().
> > >>>>> Later, I will access both the tables (store-1 and store-2) to a)
> get
> > >>> the
> > >>>>> current state of the aggregate, b) subscribe to future updates.
> This
> > >>>> works
> > >>>>> just fine. The only issue is that I assumed to have a changelog
> topic
> > >>> for
> > >>>>> store-2 created automatically, which didnt happen.
> > >>>>>
> > >>>>> Since I want to access the changelog topic, it helps if the naming
> is
> > >>>>> consistent. So either we enforce the same naming pattern as kafka
> > >> when
> > >>>>> calling .through() or alternatively the Kafka Streams API can
> > >> provide a
> > >>>>> method to materialize tables which creates a topic name according
> to
> > >>> the
> > >>>>> naming pattern. E.g. .through() without the topic parameter.
> > >>>>>
> > >>>>> What do you think?
> > >>>>>
> > >>>>> Best,
> > >>>>> Mikael
> > >>>>>
> > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> > >> matthias@confluent.io
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> I cannot completely follow what want to achieve.
> > >>>>>>
> > >>>>>> However, the JavaDoc for through() seems not to be correct to me.
> > >>> Using
> > >>>>>> through() will not create an extra internal changelog topic with
> the
> > >>>>>> described naming schema, because the topic specified in through()
> > >> can
> > >>> be
> > >>>>>> used for this (there is no point in duplicating the data).
> > >>>>>>
> > >>>>>> If you have a KTable and apply a mapValues(), this will not write
> > >> data
> > >>>>>> to any topic. The derived KTable is in-memory because you can
> easily
> > >>>>>> recreate it from its base KTable.
> > >>>>>>
> > >>>>>> What is the missing part you want to get?
> > >>>>>>
> > >>>>>> Btw: the internally created changelog topics are only used for
> > >>> recovery
> > >>>>>> in case of failure. Streams does not consumer from those topic
> > >> during
> > >>>>>> "normal operation".
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> in the documentation for KTable#through, it is stated that a new
> > >>>>>> changelog
> > >>>>>>> topic will be created for the table. It also states that calling
> > >>>> through
> > >>>>>> is
> > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > >>> streams/kstream/KTable.html#through(org.apache.kafka.
> > >>> common.serialization.Serde,%20org.apache.kafka.common.
> > >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > >>>>>>>
> > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new
> > >>> changelog
> > >>>>>>> topic will be created since the underlying topic acts as the
> > >>> changelog.
> > >>>>>>> I've verified that this is the case.
> > >>>>>>>
> > >>>>>>> Is there another API method to materialize the results of a
> KTable
> > >>>>>>> including a changelog, i.e. such that kafka streams creates the
> > >> topic
> > >>>> and
> > >>>>>>> uses the naming schema for changelog topics? The use case I have
> in
> > >>>> mind
> > >>>>>> is
> > >>>>>>> aggregate followed by mapValues.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Mikael
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Damian Guy <da...@gmail.com>.
Mikeal,

When you use `through(..)` topics are not created by KafkaStreams. You need
to create them yourself before you run the application.

Thanks,
Damian

On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <ho...@gmail.com> wrote:

> Yes, the naming is not an issue.
>
> I've tested this with the topology described earlier. Every time I start
> the topology with a call to .through() that references a topic that does
> not exist, I get an exception from the UncaughtExceptionHandler:
>
> Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic
> not found during partition assignment: words-count-changelog
>
> This happens when .through("words-count-changelog", "count") is part of the
> topology. The topology is also not forwarding anything to that topic/store.
> After restarting the application it works fine.
>
> Are the changelog topics created via, for example, .aggregate() different
> to topics auto created via .through()?
>
> Thanks,
> Mikael
>
> On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > > 1) Create a state store AND the changelog
> > > topic 2) follow the Kafka Streams naming convention for changelog
> topics.
> > > Basically, I want to have a method that does what .through() is
> supposed
> > to
> > > do according to the documentation, but without the "topic" parameter.
> >
> > I understand what you are saying, but you can get this done right now,
> > too. If you use through(...) you will get the store. And you can just
> > specify the topic name as "applicationId-storeName-changelog" to follow
> > the naming convention Streams used internally. What is the problem using
> > this approach (besides that you have to provide the topic name which
> > seems not to be a big burden to me?)
> >
> >
> > -Matthias
> >
> >
> > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > Hi Michael,
> > >
> > > thanks for the extensive explanation, and yes it definitely helps with
> my
> > > understanding of through(). :)
> > >
> > > You guessed correctly that I'm doing some "shenanings" where I'm trying
> > to
> > > derive the changelog of a state store from the state store name. This
> > works
> > > perfectly fine with with a naming convention for the topics and by
> > creating
> > > them in Kafka upfront.
> > >
> > > My point is that it would help me (and maybe others), if the API of
> > KTable
> > > was extended to have a new method that does two things that is not part
> > of
> > > the implementation of .through(). 1) Create a state store AND the
> > changelog
> > > topic 2) follow the Kafka Streams naming convention for changelog
> topics.
> > > Basically, I want to have a method that does what .through() is
> supposed
> > to
> > > do according to the documentation, but without the "topic" parameter.
> > >
> > > What do you think, would it be possible to extend the API with a method
> > > like that?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mi...@confluent.io>
> > wrote:
> > >
> > >> Mikael,
> > >>
> > >> regarding your second question:
> > >>
> > >>> 2) Regarding the use case, the topology looks like this:
> > >>>
> > >>> .stream(...)
> > >>> .aggregate(..., "store-1")
> > >>> .mapValues(...)
> > >>> .through(..., "store-2")
> > >>
> > >> The last operator above would, without "..." ellipsis, be sth like
> > >> `KTable#through("through-topic", "store-2")`.  Here, "through-topic"
> is
> > the
> > >> changelog topic for both the KTable and the state store "store-2".  So
> > this
> > >> is the changelog topic name that you want to know.
> > >>
> > >> - If you want the "through" topic to have a `-changelog` suffix, then
> > you'd
> > >> need to add that yourself in the call to `through(...)`.
> > >>
> > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > >> automatically:  That's because `through()` -- like `to()` or
> `stream()`,
> > >> `table()` -- require you to explicitly provide a topic name, and of
> > course
> > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> > only
> > >> added when Kafka creates internal changelog topics behind the scenes
> for
> > >> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> > >> because it refers to `-changelog`;  we'll fix that as mentioned above.
> > >>
> > >> - Also, in case you want to do some shenanigans (like for some tooling
> > >> you're building around state stores/changelogs/interactive queries)
> such
> > >> detecting all state store changelogs by doing the equivalent of `ls
> > >> *-changelog`, then this will miss changelogs of KTables that are
> > created by
> > >> `through()` and `to()` (unless you come up with a naming convention
> that
> > >> your tooling can assume to be in place, e.g. by always adding
> > `-changelog`
> > >> to topic names when you call `through()`).
> > >>
> > >> I hope this helps!
> > >> Michael
> > >>
> > >>
> > >>
> > >>
> > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqvist@gmail.com
> >
> > >> wrote:
> > >>
> > >>> Hi Eno,
> > >>>
> > >>> 1) Great :)
> > >>>
> > >>> 2) Yes, we are using the Interactive Queries to access the state
> > stores.
> > >> In
> > >>> addition, we access the changelogs to subscribe to updates. For this
> > >> reason
> > >>> we need to know the changelog topic name.
> > >>>
> > >>> Thanks,
> > >>> Mikael
> > >>>
> > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.thereska@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> HI Mikael,
> > >>>>
> > >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> > >> looking
> > >>>> into fixing it. I agree that it can be confusing to have topic names
> > >> that
> > >>>> are not what one would expect.
> > >>>>
> > >>>> 2) If your goal is to query/read from the state stores, you can use
> > >>>> Interactive Queries to do that (you don't need to worry about the
> > >>> changelog
> > >>>> topic name and such). Interactive Queries is a new feature in 0.10.1
> > >>> (blog
> > >>>> here:
> > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > >>> and-interactive-queries-in-apache-kafka/
> > >>>> <
> > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > >>> and-interactive-queries-in-apache-kafka/
> > >>>>> ).
> > >>>>
> > >>>> Thanks
> > >>>> Eno
> > >>>>
> > >>>>
> > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
> > >> wrote:
> > >>>>>
> > >>>>> Sorry for being unclear, i'll try again :)
> > >>>>>
> > >>>>> 1) The JavaDoc for through is not correct, it states that a
> changelog
> > >>>> topic
> > >>>>> will be created for the state store. That is, if I would call it
> with
> > >>>>> through("topic", "a-store"), I would expect a kafka topic
> > >>>>> "my-app-id-a-store-changelog" to be created.
> > >>>>>
> > >>>>> 2) Regarding the use case, the topology looks like this:
> > >>>>>
> > >>>>> .stream(...)
> > >>>>> .aggregate(..., "store-1")
> > >>>>> .mapValues(...)
> > >>>>> .through(..., "store-2")
> > >>>>>
> > >>>>> Basically, I want to materialize both the result from the aggregate
> > >>>> method
> > >>>>> and the result from mapValues, which is materialized using
> > >> .through().
> > >>>>> Later, I will access both the tables (store-1 and store-2) to a)
> get
> > >>> the
> > >>>>> current state of the aggregate, b) subscribe to future updates.
> This
> > >>>> works
> > >>>>> just fine. The only issue is that I assumed to have a changelog
> topic
> > >>> for
> > >>>>> store-2 created automatically, which didnt happen.
> > >>>>>
> > >>>>> Since I want to access the changelog topic, it helps if the naming
> is
> > >>>>> consistent. So either we enforce the same naming pattern as kafka
> > >> when
> > >>>>> calling .through() or alternatively the Kafka Streams API can
> > >> provide a
> > >>>>> method to materialize tables which creates a topic name according
> to
> > >>> the
> > >>>>> naming pattern. E.g. .through() without the topic parameter.
> > >>>>>
> > >>>>> What do you think?
> > >>>>>
> > >>>>> Best,
> > >>>>> Mikael
> > >>>>>
> > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> > >> matthias@confluent.io
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> I cannot completely follow what want to achieve.
> > >>>>>>
> > >>>>>> However, the JavaDoc for through() seems not to be correct to me.
> > >>> Using
> > >>>>>> through() will not create an extra internal changelog topic with
> the
> > >>>>>> described naming schema, because the topic specified in through()
> > >> can
> > >>> be
> > >>>>>> used for this (there is no point in duplicating the data).
> > >>>>>>
> > >>>>>> If you have a KTable and apply a mapValues(), this will not write
> > >> data
> > >>>>>> to any topic. The derived KTable is in-memory because you can
> easily
> > >>>>>> recreate it from its base KTable.
> > >>>>>>
> > >>>>>> What is the missing part you want to get?
> > >>>>>>
> > >>>>>> Btw: the internally created changelog topics are only used for
> > >>> recovery
> > >>>>>> in case of failure. Streams does not consumer from those topic
> > >> during
> > >>>>>> "normal operation".
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> in the documentation for KTable#through, it is stated that a new
> > >>>>>> changelog
> > >>>>>>> topic will be created for the table. It also states that calling
> > >>>> through
> > >>>>>> is
> > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > >>> streams/kstream/KTable.html#through(org.apache.kafka.
> > >>> common.serialization.Serde,%20org.apache.kafka.common.
> > >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > >>>>>>>
> > >>>>>>> In the docs for KStreamBuilder#table it is stated that no new
> > >>> changelog
> > >>>>>>> topic will be created since the underlying topic acts as the
> > >>> changelog.
> > >>>>>>> I've verified that this is the case.
> > >>>>>>>
> > >>>>>>> Is there another API method to materialize the results of a
> KTable
> > >>>>>>> including a changelog, i.e. such that kafka streams creates the
> > >> topic
> > >>>> and
> > >>>>>>> uses the naming schema for changelog topics? The use case I have
> in
> > >>>> mind
> > >>>>>> is
> > >>>>>>> aggregate followed by mapValues.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Mikael
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Mikael Högqvist <ho...@gmail.com>.
Yes, the naming is not an issue.

I've tested this with the topology described earlier. Every time I start
the topology with a call to .through() that references a topic that does
not exist, I get an exception from the UncaughtExceptionHandler:

Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic
not found during partition assignment: words-count-changelog

This happens when .through("words-count-changelog", "count") is part of the
topology. The topology is also not forwarding anything to that topic/store.
After restarting the application it works fine.

Are the changelog topics created via, for example, .aggregate() different
to topics auto created via .through()?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> > 1) Create a state store AND the changelog
> > topic 2) follow the Kafka Streams naming convention for changelog topics.
> > Basically, I want to have a method that does what .through() is supposed
> to
> > do according to the documentation, but without the "topic" parameter.
>
> I understand what you are saying, but you can get this done right now,
> too. If you use through(...) you will get the store. And you can just
> specify the topic name as "applicationId-storeName-changelog" to follow
> the naming convention Streams used internally. What is the problem using
> this approach (besides that you have to provide the topic name which
> seems not to be a big burden to me?)
>
>
> -Matthias
>
>
> On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > Hi Michael,
> >
> > thanks for the extensive explanation, and yes it definitely helps with my
> > understanding of through(). :)
> >
> > You guessed correctly that I'm doing some "shenanings" where I'm trying
> to
> > derive the changelog of a state store from the state store name. This
> works
> > perfectly fine with with a naming convention for the topics and by
> creating
> > them in Kafka upfront.
> >
> > My point is that it would help me (and maybe others), if the API of
> KTable
> > was extended to have a new method that does two things that is not part
> of
> > the implementation of .through(). 1) Create a state store AND the
> changelog
> > topic 2) follow the Kafka Streams naming convention for changelog topics.
> > Basically, I want to have a method that does what .through() is supposed
> to
> > do according to the documentation, but without the "topic" parameter.
> >
> > What do you think, would it be possible to extend the API with a method
> > like that?
> >
> > Thanks,
> > Mikael
> >
> > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mi...@confluent.io>
> wrote:
> >
> >> Mikael,
> >>
> >> regarding your second question:
> >>
> >>> 2) Regarding the use case, the topology looks like this:
> >>>
> >>> .stream(...)
> >>> .aggregate(..., "store-1")
> >>> .mapValues(...)
> >>> .through(..., "store-2")
> >>
> >> The last operator above would, without "..." ellipsis, be sth like
> >> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the
> >> changelog topic for both the KTable and the state store "store-2".  So
> this
> >> is the changelog topic name that you want to know.
> >>
> >> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd
> >> need to add that yourself in the call to `through(...)`.
> >>
> >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> >> automatically:  That's because `through()` -- like `to()` or `stream()`,
> >> `table()` -- require you to explicitly provide a topic name, and of
> course
> >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> only
> >> added when Kafka creates internal changelog topics behind the scenes for
> >> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> >> because it refers to `-changelog`;  we'll fix that as mentioned above.
> >>
> >> - Also, in case you want to do some shenanigans (like for some tooling
> >> you're building around state stores/changelogs/interactive queries) such
> >> detecting all state store changelogs by doing the equivalent of `ls
> >> *-changelog`, then this will miss changelogs of KTables that are
> created by
> >> `through()` and `to()` (unless you come up with a naming convention that
> >> your tooling can assume to be in place, e.g. by always adding
> `-changelog`
> >> to topic names when you call `through()`).
> >>
> >> I hope this helps!
> >> Michael
> >>
> >>
> >>
> >>
> >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <ho...@gmail.com>
> >> wrote:
> >>
> >>> Hi Eno,
> >>>
> >>> 1) Great :)
> >>>
> >>> 2) Yes, we are using the Interactive Queries to access the state
> stores.
> >> In
> >>> addition, we access the changelogs to subscribe to updates. For this
> >> reason
> >>> we need to know the changelog topic name.
> >>>
> >>> Thanks,
> >>> Mikael
> >>>
> >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <en...@gmail.com>
> >>> wrote:
> >>>
> >>>> HI Mikael,
> >>>>
> >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> >> looking
> >>>> into fixing it. I agree that it can be confusing to have topic names
> >> that
> >>>> are not what one would expect.
> >>>>
> >>>> 2) If your goal is to query/read from the state stores, you can use
> >>>> Interactive Queries to do that (you don't need to worry about the
> >>> changelog
> >>>> topic name and such). Interactive Queries is a new feature in 0.10.1
> >>> (blog
> >>>> here:
> >>>> https://www.confluent.io/blog/unifying-stream-processing-
> >>> and-interactive-queries-in-apache-kafka/
> >>>> <
> >>>> https://www.confluent.io/blog/unifying-stream-processing-
> >>> and-interactive-queries-in-apache-kafka/
> >>>>> ).
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>
> >>>>
> >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
> >> wrote:
> >>>>>
> >>>>> Sorry for being unclear, i'll try again :)
> >>>>>
> >>>>> 1) The JavaDoc for through is not correct, it states that a changelog
> >>>> topic
> >>>>> will be created for the state store. That is, if I would call it with
> >>>>> through("topic", "a-store"), I would expect a kafka topic
> >>>>> "my-app-id-a-store-changelog" to be created.
> >>>>>
> >>>>> 2) Regarding the use case, the topology looks like this:
> >>>>>
> >>>>> .stream(...)
> >>>>> .aggregate(..., "store-1")
> >>>>> .mapValues(...)
> >>>>> .through(..., "store-2")
> >>>>>
> >>>>> Basically, I want to materialize both the result from the aggregate
> >>>> method
> >>>>> and the result from mapValues, which is materialized using
> >> .through().
> >>>>> Later, I will access both the tables (store-1 and store-2) to a) get
> >>> the
> >>>>> current state of the aggregate, b) subscribe to future updates. This
> >>>> works
> >>>>> just fine. The only issue is that I assumed to have a changelog topic
> >>> for
> >>>>> store-2 created automatically, which didnt happen.
> >>>>>
> >>>>> Since I want to access the changelog topic, it helps if the naming is
> >>>>> consistent. So either we enforce the same naming pattern as kafka
> >> when
> >>>>> calling .through() or alternatively the Kafka Streams API can
> >> provide a
> >>>>> method to materialize tables which creates a topic name according to
> >>> the
> >>>>> naming pattern. E.g. .through() without the topic parameter.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Mikael
> >>>>>
> >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> >> matthias@confluent.io
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> I cannot completely follow what want to achieve.
> >>>>>>
> >>>>>> However, the JavaDoc for through() seems not to be correct to me.
> >>> Using
> >>>>>> through() will not create an extra internal changelog topic with the
> >>>>>> described naming schema, because the topic specified in through()
> >> can
> >>> be
> >>>>>> used for this (there is no point in duplicating the data).
> >>>>>>
> >>>>>> If you have a KTable and apply a mapValues(), this will not write
> >> data
> >>>>>> to any topic. The derived KTable is in-memory because you can easily
> >>>>>> recreate it from its base KTable.
> >>>>>>
> >>>>>> What is the missing part you want to get?
> >>>>>>
> >>>>>> Btw: the internally created changelog topics are only used for
> >>> recovery
> >>>>>> in case of failure. Streams does not consumer from those topic
> >> during
> >>>>>> "normal operation".
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> in the documentation for KTable#through, it is stated that a new
> >>>>>> changelog
> >>>>>>> topic will be created for the table. It also states that calling
> >>>> through
> >>>>>> is
> >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> >>> streams/kstream/KTable.html#through(org.apache.kafka.
> >>> common.serialization.Serde,%20org.apache.kafka.common.
> >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> >>>>>>>
> >>>>>>> In the docs for KStreamBuilder#table it is stated that no new
> >>> changelog
> >>>>>>> topic will be created since the underlying topic acts as the
> >>> changelog.
> >>>>>>> I've verified that this is the case.
> >>>>>>>
> >>>>>>> Is there another API method to materialize the results of a KTable
> >>>>>>> including a changelog, i.e. such that kafka streams creates the
> >> topic
> >>>> and
> >>>>>>> uses the naming schema for changelog topics? The use case I have in
> >>>> mind
> >>>>>> is
> >>>>>>> aggregate followed by mapValues.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Mikael
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> 1) Create a state store AND the changelog
> topic 2) follow the Kafka Streams naming convention for changelog topics.
> Basically, I want to have a method that does what .through() is supposed to
> do according to the documentation, but without the "topic" parameter.

I understand what you are saying, but you can get this done right now,
too. If you use through(...) you will get the store. And you can just
specify the topic name as "applicationId-storeName-changelog" to follow
the naming convention Streams used internally. What is the problem using
this approach (besides that you have to provide the topic name which
seems not to be a big burden to me?)


-Matthias


On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> Hi Michael,
> 
> thanks for the extensive explanation, and yes it definitely helps with my
> understanding of through(). :)
> 
> You guessed correctly that I'm doing some "shenanings" where I'm trying to
> derive the changelog of a state store from the state store name. This works
> perfectly fine with with a naming convention for the topics and by creating
> them in Kafka upfront.
> 
> My point is that it would help me (and maybe others), if the API of KTable
> was extended to have a new method that does two things that is not part of
> the implementation of .through(). 1) Create a state store AND the changelog
> topic 2) follow the Kafka Streams naming convention for changelog topics.
> Basically, I want to have a method that does what .through() is supposed to
> do according to the documentation, but without the "topic" parameter.
> 
> What do you think, would it be possible to extend the API with a method
> like that?
> 
> Thanks,
> Mikael
> 
> On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mi...@confluent.io> wrote:
> 
>> Mikael,
>>
>> regarding your second question:
>>
>>> 2) Regarding the use case, the topology looks like this:
>>>
>>> .stream(...)
>>> .aggregate(..., "store-1")
>>> .mapValues(...)
>>> .through(..., "store-2")
>>
>> The last operator above would, without "..." ellipsis, be sth like
>> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
>> changelog topic for both the KTable and the state store "store-2".  So this
>> is the changelog topic name that you want to know.
>>
>> - If you want the "through" topic to have a `-changelog` suffix, then you'd
>> need to add that yourself in the call to `through(...)`.
>>
>> - If you wonder why `through()` doesn't add a `-changelog` suffix
>> automatically:  That's because `through()` -- like `to()` or `stream()`,
>> `table()` -- require you to explicitly provide a topic name, and of course
>> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
>> added when Kafka creates internal changelog topics behind the scenes for
>> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
>> because it refers to `-changelog`;  we'll fix that as mentioned above.
>>
>> - Also, in case you want to do some shenanigans (like for some tooling
>> you're building around state stores/changelogs/interactive queries) such
>> detecting all state store changelogs by doing the equivalent of `ls
>> *-changelog`, then this will miss changelogs of KTables that are created by
>> `through()` and `to()` (unless you come up with a naming convention that
>> your tooling can assume to be in place, e.g. by always adding `-changelog`
>> to topic names when you call `through()`).
>>
>> I hope this helps!
>> Michael
>>
>>
>>
>>
>> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <ho...@gmail.com>
>> wrote:
>>
>>> Hi Eno,
>>>
>>> 1) Great :)
>>>
>>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>>> we need to know the changelog topic name.
>>>
>>> Thanks,
>>> Mikael
>>>
>>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <en...@gmail.com>
>>> wrote:
>>>
>>>> HI Mikael,
>>>>
>>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
>>>> into fixing it. I agree that it can be confusing to have topic names
>> that
>>>> are not what one would expect.
>>>>
>>>> 2) If your goal is to query/read from the state stores, you can use
>>>> Interactive Queries to do that (you don't need to worry about the
>>> changelog
>>>> topic name and such). Interactive Queries is a new feature in 0.10.1
>>> (blog
>>>> here:
>>>> https://www.confluent.io/blog/unifying-stream-processing-
>>> and-interactive-queries-in-apache-kafka/
>>>> <
>>>> https://www.confluent.io/blog/unifying-stream-processing-
>>> and-interactive-queries-in-apache-kafka/
>>>>> ).
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>
>>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
>> wrote:
>>>>>
>>>>> Sorry for being unclear, i'll try again :)
>>>>>
>>>>> 1) The JavaDoc for through is not correct, it states that a changelog
>>>> topic
>>>>> will be created for the state store. That is, if I would call it with
>>>>> through("topic", "a-store"), I would expect a kafka topic
>>>>> "my-app-id-a-store-changelog" to be created.
>>>>>
>>>>> 2) Regarding the use case, the topology looks like this:
>>>>>
>>>>> .stream(...)
>>>>> .aggregate(..., "store-1")
>>>>> .mapValues(...)
>>>>> .through(..., "store-2")
>>>>>
>>>>> Basically, I want to materialize both the result from the aggregate
>>>> method
>>>>> and the result from mapValues, which is materialized using
>> .through().
>>>>> Later, I will access both the tables (store-1 and store-2) to a) get
>>> the
>>>>> current state of the aggregate, b) subscribe to future updates. This
>>>> works
>>>>> just fine. The only issue is that I assumed to have a changelog topic
>>> for
>>>>> store-2 created automatically, which didnt happen.
>>>>>
>>>>> Since I want to access the changelog topic, it helps if the naming is
>>>>> consistent. So either we enforce the same naming pattern as kafka
>> when
>>>>> calling .through() or alternatively the Kafka Streams API can
>> provide a
>>>>> method to materialize tables which creates a topic name according to
>>> the
>>>>> naming pattern. E.g. .through() without the topic parameter.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Mikael
>>>>>
>>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
>> matthias@confluent.io
>>>>
>>>>> wrote:
>>>>>
>>>>>> I cannot completely follow what want to achieve.
>>>>>>
>>>>>> However, the JavaDoc for through() seems not to be correct to me.
>>> Using
>>>>>> through() will not create an extra internal changelog topic with the
>>>>>> described naming schema, because the topic specified in through()
>> can
>>> be
>>>>>> used for this (there is no point in duplicating the data).
>>>>>>
>>>>>> If you have a KTable and apply a mapValues(), this will not write
>> data
>>>>>> to any topic. The derived KTable is in-memory because you can easily
>>>>>> recreate it from its base KTable.
>>>>>>
>>>>>> What is the missing part you want to get?
>>>>>>
>>>>>> Btw: the internally created changelog topics are only used for
>>> recovery
>>>>>> in case of failure. Streams does not consumer from those topic
>> during
>>>>>> "normal operation".
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> in the documentation for KTable#through, it is stated that a new
>>>>>> changelog
>>>>>>> topic will be created for the table. It also states that calling
>>>> through
>>>>>> is
>>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
>>>>>>>
>>>>>>>
>>>>>>
>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
>>> streams/kstream/KTable.html#through(org.apache.kafka.
>>> common.serialization.Serde,%20org.apache.kafka.common.
>>> serialization.Serde,%20java.lang.String,%20java.lang.String)
>>>>>>>
>>>>>>> In the docs for KStreamBuilder#table it is stated that no new
>>> changelog
>>>>>>> topic will be created since the underlying topic acts as the
>>> changelog.
>>>>>>> I've verified that this is the case.
>>>>>>>
>>>>>>> Is there another API method to materialize the results of a KTable
>>>>>>> including a changelog, i.e. such that kafka streams creates the
>> topic
>>>> and
>>>>>>> uses the naming schema for changelog topics? The use case I have in
>>>> mind
>>>>>> is
>>>>>>> aggregate followed by mapValues.
>>>>>>>
>>>>>>> Best,
>>>>>>> Mikael
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
> 


Re: KafkaStreams KTable#through not creating changelog topic

Posted by Mikael Högqvist <ho...@gmail.com>.
Hi Michael,

thanks for the extensive explanation, and yes it definitely helps with my
understanding of through(). :)

You guessed correctly that I'm doing some "shenanings" where I'm trying to
derive the changelog of a state store from the state store name. This works
perfectly fine with with a naming convention for the topics and by creating
them in Kafka upfront.

My point is that it would help me (and maybe others), if the API of KTable
was extended to have a new method that does two things that is not part of
the implementation of .through(). 1) Create a state store AND the changelog
topic 2) follow the Kafka Streams naming convention for changelog topics.
Basically, I want to have a method that does what .through() is supposed to
do according to the documentation, but without the "topic" parameter.

What do you think, would it be possible to extend the API with a method
like that?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <mi...@confluent.io> wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
> changelog topic for both the KTable and the state store "store-2".  So this
> is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then you'd
> need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <ho...@gmail.com>
> wrote:
>
> > Hi Eno,
> >
> > 1) Great :)
> >
> > 2) Yes, we are using the Interactive Queries to access the state stores.
> In
> > addition, we access the changelogs to subscribe to updates. For this
> reason
> > we need to know the changelog topic name.
> >
> > Thanks,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <en...@gmail.com>
> > wrote:
> >
> > > HI Mikael,
> > >
> > > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> looking
> > > into fixing it. I agree that it can be confusing to have topic names
> that
> > > are not what one would expect.
> > >
> > > 2) If your goal is to query/read from the state stores, you can use
> > > Interactive Queries to do that (you don't need to worry about the
> > changelog
> > > topic name and such). Interactive Queries is a new feature in 0.10.1
> > (blog
> > > here:
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > <
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > >).
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
> wrote:
> > > >
> > > > Sorry for being unclear, i'll try again :)
> > > >
> > > > 1) The JavaDoc for through is not correct, it states that a changelog
> > > topic
> > > > will be created for the state store. That is, if I would call it with
> > > > through("topic", "a-store"), I would expect a kafka topic
> > > > "my-app-id-a-store-changelog" to be created.
> > > >
> > > > 2) Regarding the use case, the topology looks like this:
> > > >
> > > > .stream(...)
> > > > .aggregate(..., "store-1")
> > > > .mapValues(...)
> > > > .through(..., "store-2")
> > > >
> > > > Basically, I want to materialize both the result from the aggregate
> > > method
> > > > and the result from mapValues, which is materialized using
> .through().
> > > > Later, I will access both the tables (store-1 and store-2) to a) get
> > the
> > > > current state of the aggregate, b) subscribe to future updates. This
> > > works
> > > > just fine. The only issue is that I assumed to have a changelog topic
> > for
> > > > store-2 created automatically, which didnt happen.
> > > >
> > > > Since I want to access the changelog topic, it helps if the naming is
> > > > consistent. So either we enforce the same naming pattern as kafka
> when
> > > > calling .through() or alternatively the Kafka Streams API can
> provide a
> > > > method to materialize tables which creates a topic name according to
> > the
> > > > naming pattern. E.g. .through() without the topic parameter.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Mikael
> > > >
> > > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> I cannot completely follow what want to achieve.
> > > >>
> > > >> However, the JavaDoc for through() seems not to be correct to me.
> > Using
> > > >> through() will not create an extra internal changelog topic with the
> > > >> described naming schema, because the topic specified in through()
> can
> > be
> > > >> used for this (there is no point in duplicating the data).
> > > >>
> > > >> If you have a KTable and apply a mapValues(), this will not write
> data
> > > >> to any topic. The derived KTable is in-memory because you can easily
> > > >> recreate it from its base KTable.
> > > >>
> > > >> What is the missing part you want to get?
> > > >>
> > > >> Btw: the internally created changelog topics are only used for
> > recovery
> > > >> in case of failure. Streams does not consumer from those topic
> during
> > > >> "normal operation".
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > > >>> Hi,
> > > >>>
> > > >>> in the documentation for KTable#through, it is stated that a new
> > > >> changelog
> > > >>> topic will be created for the table. It also states that calling
> > > through
> > > >> is
> > > >>> equivalent to calling #to followed by KStreamBuilder#table.
> > > >>>
> > > >>>
> > > >>
> > > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > streams/kstream/KTable.html#through(org.apache.kafka.
> > common.serialization.Serde,%20org.apache.kafka.common.
> > serialization.Serde,%20java.lang.String,%20java.lang.String)
> > > >>>
> > > >>> In the docs for KStreamBuilder#table it is stated that no new
> > changelog
> > > >>> topic will be created since the underlying topic acts as the
> > changelog.
> > > >>> I've verified that this is the case.
> > > >>>
> > > >>> Is there another API method to materialize the results of a KTable
> > > >>> including a changelog, i.e. such that kafka streams creates the
> topic
> > > and
> > > >>> uses the naming schema for changelog topics? The use case I have in
> > > mind
> > > >> is
> > > >>> aggregate followed by mapValues.
> > > >>>
> > > >>> Best,
> > > >>> Mikael
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Michael Noll <mi...@confluent.io>.
> - Also, in case you want to do some shenanigans (like for some tooling
you're building
> around state stores/changelogs/interactive queries) such detecting all
state store changelogs
> by doing the equivalent of `ls *-changelog`, then this will miss
changelogs of KTables that are
> created by `through()` and `to()` [...]

Addendum: And that's because the topic that is created by
`KTable#through()` and `KTable#to()` is, by definition, a changelog of that
KTable and the associated state store.



On Wed, Nov 23, 2016 at 4:15 PM, Michael Noll <mi...@confluent.io> wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the changelog topic for both the KTable and the state store "store-2".  So
> this is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <ho...@gmail.com>
> wrote:
>
>> Hi Eno,
>>
>> 1) Great :)
>>
>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>> we need to know the changelog topic name.
>>
>> Thanks,
>> Mikael
>>
>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <en...@gmail.com>
>> wrote:
>>
>> > HI Mikael,
>> >
>> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
>> > into fixing it. I agree that it can be confusing to have topic names
>> that
>> > are not what one would expect.
>> >
>> > 2) If your goal is to query/read from the state stores, you can use
>> > Interactive Queries to do that (you don't need to worry about the
>> changelog
>> > topic name and such). Interactive Queries is a new feature in 0.10.1
>> (blog
>> > here:
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > <
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > >).
>> >
>> > Thanks
>> > Eno
>> >
>> >
>> > > On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com>
>> wrote:
>> > >
>> > > Sorry for being unclear, i'll try again :)
>> > >
>> > > 1) The JavaDoc for through is not correct, it states that a changelog
>> > topic
>> > > will be created for the state store. That is, if I would call it with
>> > > through("topic", "a-store"), I would expect a kafka topic
>> > > "my-app-id-a-store-changelog" to be created.
>> > >
>> > > 2) Regarding the use case, the topology looks like this:
>> > >
>> > > .stream(...)
>> > > .aggregate(..., "store-1")
>> > > .mapValues(...)
>> > > .through(..., "store-2")
>> > >
>> > > Basically, I want to materialize both the result from the aggregate
>> > method
>> > > and the result from mapValues, which is materialized using .through().
>> > > Later, I will access both the tables (store-1 and store-2) to a) get
>> the
>> > > current state of the aggregate, b) subscribe to future updates. This
>> > works
>> > > just fine. The only issue is that I assumed to have a changelog topic
>> for
>> > > store-2 created automatically, which didnt happen.
>> > >
>> > > Since I want to access the changelog topic, it helps if the naming is
>> > > consistent. So either we enforce the same naming pattern as kafka when
>> > > calling .through() or alternatively the Kafka Streams API can provide
>> a
>> > > method to materialize tables which creates a topic name according to
>> the
>> > > naming pattern. E.g. .through() without the topic parameter.
>> > >
>> > > What do you think?
>> > >
>> > > Best,
>> > > Mikael
>> > >
>> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
>> matthias@confluent.io>
>> > > wrote:
>> > >
>> > >> I cannot completely follow what want to achieve.
>> > >>
>> > >> However, the JavaDoc for through() seems not to be correct to me.
>> Using
>> > >> through() will not create an extra internal changelog topic with the
>> > >> described naming schema, because the topic specified in through()
>> can be
>> > >> used for this (there is no point in duplicating the data).
>> > >>
>> > >> If you have a KTable and apply a mapValues(), this will not write
>> data
>> > >> to any topic. The derived KTable is in-memory because you can easily
>> > >> recreate it from its base KTable.
>> > >>
>> > >> What is the missing part you want to get?
>> > >>
>> > >> Btw: the internally created changelog topics are only used for
>> recovery
>> > >> in case of failure. Streams does not consumer from those topic during
>> > >> "normal operation".
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >>
>> > >>
>> > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
>> > >>> Hi,
>> > >>>
>> > >>> in the documentation for KTable#through, it is stated that a new
>> > >> changelog
>> > >>> topic will be created for the table. It also states that calling
>> > through
>> > >> is
>> > >>> equivalent to calling #to followed by KStreamBuilder#table.
>> > >>>
>> > >>>
>> > >>
>> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/stream
>> s/kstream/KTable.html#through(org.apache.kafka.common.
>> serialization.Serde,%20org.apache.kafka.common.serializat
>> ion.Serde,%20java.lang.String,%20java.lang.String)
>> > >>>
>> > >>> In the docs for KStreamBuilder#table it is stated that no new
>> changelog
>> > >>> topic will be created since the underlying topic acts as the
>> changelog.
>> > >>> I've verified that this is the case.
>> > >>>
>> > >>> Is there another API method to materialize the results of a KTable
>> > >>> including a changelog, i.e. such that kafka streams creates the
>> topic
>> > and
>> > >>> uses the naming schema for changelog topics? The use case I have in
>> > mind
>> > >> is
>> > >>> aggregate followed by mapValues.
>> > >>>
>> > >>> Best,
>> > >>> Mikael
>> > >>>
>> > >>
>> > >>
>> >
>> >
>>
>
>
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Michael Noll <mi...@confluent.io>.
Mikael,

regarding your second question:

> 2) Regarding the use case, the topology looks like this:
>
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")

The last operator above would, without "..." ellipsis, be sth like
`KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
changelog topic for both the KTable and the state store "store-2".  So this
is the changelog topic name that you want to know.

- If you want the "through" topic to have a `-changelog` suffix, then you'd
need to add that yourself in the call to `through(...)`.

- If you wonder why `through()` doesn't add a `-changelog` suffix
automatically:  That's because `through()` -- like `to()` or `stream()`,
`table()` -- require you to explicitly provide a topic name, and of course
Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
added when Kafka creates internal changelog topics behind the scenes for
you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
because it refers to `-changelog`;  we'll fix that as mentioned above.

- Also, in case you want to do some shenanigans (like for some tooling
you're building around state stores/changelogs/interactive queries) such
detecting all state store changelogs by doing the equivalent of `ls
*-changelog`, then this will miss changelogs of KTables that are created by
`through()` and `to()` (unless you come up with a naming convention that
your tooling can assume to be in place, e.g. by always adding `-changelog`
to topic names when you call `through()`).

I hope this helps!
Michael




On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <ho...@gmail.com>
wrote:

> Hi Eno,
>
> 1) Great :)
>
> 2) Yes, we are using the Interactive Queries to access the state stores. In
> addition, we access the changelogs to subscribe to updates. For this reason
> we need to know the changelog topic name.
>
> Thanks,
> Mikael
>
> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <en...@gmail.com>
> wrote:
>
> > HI Mikael,
> >
> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> > into fixing it. I agree that it can be confusing to have topic names that
> > are not what one would expect.
> >
> > 2) If your goal is to query/read from the state stores, you can use
> > Interactive Queries to do that (you don't need to worry about the
> changelog
> > topic name and such). Interactive Queries is a new feature in 0.10.1
> (blog
> > here:
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > <
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > >).
> >
> > Thanks
> > Eno
> >
> >
> > > On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com> wrote:
> > >
> > > Sorry for being unclear, i'll try again :)
> > >
> > > 1) The JavaDoc for through is not correct, it states that a changelog
> > topic
> > > will be created for the state store. That is, if I would call it with
> > > through("topic", "a-store"), I would expect a kafka topic
> > > "my-app-id-a-store-changelog" to be created.
> > >
> > > 2) Regarding the use case, the topology looks like this:
> > >
> > > .stream(...)
> > > .aggregate(..., "store-1")
> > > .mapValues(...)
> > > .through(..., "store-2")
> > >
> > > Basically, I want to materialize both the result from the aggregate
> > method
> > > and the result from mapValues, which is materialized using .through().
> > > Later, I will access both the tables (store-1 and store-2) to a) get
> the
> > > current state of the aggregate, b) subscribe to future updates. This
> > works
> > > just fine. The only issue is that I assumed to have a changelog topic
> for
> > > store-2 created automatically, which didnt happen.
> > >
> > > Since I want to access the changelog topic, it helps if the naming is
> > > consistent. So either we enforce the same naming pattern as kafka when
> > > calling .through() or alternatively the Kafka Streams API can provide a
> > > method to materialize tables which creates a topic name according to
> the
> > > naming pattern. E.g. .through() without the topic parameter.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> I cannot completely follow what want to achieve.
> > >>
> > >> However, the JavaDoc for through() seems not to be correct to me.
> Using
> > >> through() will not create an extra internal changelog topic with the
> > >> described naming schema, because the topic specified in through() can
> be
> > >> used for this (there is no point in duplicating the data).
> > >>
> > >> If you have a KTable and apply a mapValues(), this will not write data
> > >> to any topic. The derived KTable is in-memory because you can easily
> > >> recreate it from its base KTable.
> > >>
> > >> What is the missing part you want to get?
> > >>
> > >> Btw: the internally created changelog topics are only used for
> recovery
> > >> in case of failure. Streams does not consumer from those topic during
> > >> "normal operation".
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > >>> Hi,
> > >>>
> > >>> in the documentation for KTable#through, it is stated that a new
> > >> changelog
> > >>> topic will be created for the table. It also states that calling
> > through
> > >> is
> > >>> equivalent to calling #to followed by KStreamBuilder#table.
> > >>>
> > >>>
> > >>
> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> streams/kstream/KTable.html#through(org.apache.kafka.
> common.serialization.Serde,%20org.apache.kafka.common.
> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > >>>
> > >>> In the docs for KStreamBuilder#table it is stated that no new
> changelog
> > >>> topic will be created since the underlying topic acts as the
> changelog.
> > >>> I've verified that this is the case.
> > >>>
> > >>> Is there another API method to materialize the results of a KTable
> > >>> including a changelog, i.e. such that kafka streams creates the topic
> > and
> > >>> uses the naming schema for changelog topics? The use case I have in
> > mind
> > >> is
> > >>> aggregate followed by mapValues.
> > >>>
> > >>> Best,
> > >>> Mikael
> > >>>
> > >>
> > >>
> >
> >
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Mikael Högqvist <ho...@gmail.com>.
Hi Eno,

1) Great :)

2) Yes, we are using the Interactive Queries to access the state stores. In
addition, we access the changelogs to subscribe to updates. For this reason
we need to know the changelog topic name.

Thanks,
Mikael

On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <en...@gmail.com> wrote:

> HI Mikael,
>
> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> into fixing it. I agree that it can be confusing to have topic names that
> are not what one would expect.
>
> 2) If your goal is to query/read from the state stores, you can use
> Interactive Queries to do that (you don't need to worry about the changelog
> topic name and such). Interactive Queries is a new feature in 0.10.1 (blog
> here:
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> <
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> >).
>
> Thanks
> Eno
>
>
> > On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com> wrote:
> >
> > Sorry for being unclear, i'll try again :)
> >
> > 1) The JavaDoc for through is not correct, it states that a changelog
> topic
> > will be created for the state store. That is, if I would call it with
> > through("topic", "a-store"), I would expect a kafka topic
> > "my-app-id-a-store-changelog" to be created.
> >
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
> >
> > Basically, I want to materialize both the result from the aggregate
> method
> > and the result from mapValues, which is materialized using .through().
> > Later, I will access both the tables (store-1 and store-2) to a) get the
> > current state of the aggregate, b) subscribe to future updates. This
> works
> > just fine. The only issue is that I assumed to have a changelog topic for
> > store-2 created automatically, which didnt happen.
> >
> > Since I want to access the changelog topic, it helps if the naming is
> > consistent. So either we enforce the same naming pattern as kafka when
> > calling .through() or alternatively the Kafka Streams API can provide a
> > method to materialize tables which creates a topic name according to the
> > naming pattern. E.g. .through() without the topic parameter.
> >
> > What do you think?
> >
> > Best,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> I cannot completely follow what want to achieve.
> >>
> >> However, the JavaDoc for through() seems not to be correct to me. Using
> >> through() will not create an extra internal changelog topic with the
> >> described naming schema, because the topic specified in through() can be
> >> used for this (there is no point in duplicating the data).
> >>
> >> If you have a KTable and apply a mapValues(), this will not write data
> >> to any topic. The derived KTable is in-memory because you can easily
> >> recreate it from its base KTable.
> >>
> >> What is the missing part you want to get?
> >>
> >> Btw: the internally created changelog topics are only used for recovery
> >> in case of failure. Streams does not consumer from those topic during
> >> "normal operation".
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> >>> Hi,
> >>>
> >>> in the documentation for KTable#through, it is stated that a new
> >> changelog
> >>> topic will be created for the table. It also states that calling
> through
> >> is
> >>> equivalent to calling #to followed by KStreamBuilder#table.
> >>>
> >>>
> >>
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> >>>
> >>> In the docs for KStreamBuilder#table it is stated that no new changelog
> >>> topic will be created since the underlying topic acts as the changelog.
> >>> I've verified that this is the case.
> >>>
> >>> Is there another API method to materialize the results of a KTable
> >>> including a changelog, i.e. such that kafka streams creates the topic
> and
> >>> uses the naming schema for changelog topics? The use case I have in
> mind
> >> is
> >>> aggregate followed by mapValues.
> >>>
> >>> Best,
> >>> Mikael
> >>>
> >>
> >>
>
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by Eno Thereska <en...@gmail.com>.
HI Mikael,

1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking into fixing it. I agree that it can be confusing to have topic names that are not what one would expect.

2) If your goal is to query/read from the state stores, you can use Interactive Queries to do that (you don't need to worry about the changelog topic name and such). Interactive Queries is a new feature in 0.10.1 (blog here: https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>). 

Thanks
Eno


> On 22 Nov 2016, at 19:27, Mikael Högqvist <ho...@gmail.com> wrote:
> 
> Sorry for being unclear, i'll try again :)
> 
> 1) The JavaDoc for through is not correct, it states that a changelog topic
> will be created for the state store. That is, if I would call it with
> through("topic", "a-store"), I would expect a kafka topic
> "my-app-id-a-store-changelog" to be created.
> 
> 2) Regarding the use case, the topology looks like this:
> 
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")
> 
> Basically, I want to materialize both the result from the aggregate method
> and the result from mapValues, which is materialized using .through().
> Later, I will access both the tables (store-1 and store-2) to a) get the
> current state of the aggregate, b) subscribe to future updates. This works
> just fine. The only issue is that I assumed to have a changelog topic for
> store-2 created automatically, which didnt happen.
> 
> Since I want to access the changelog topic, it helps if the naming is
> consistent. So either we enforce the same naming pattern as kafka when
> calling .through() or alternatively the Kafka Streams API can provide a
> method to materialize tables which creates a topic name according to the
> naming pattern. E.g. .through() without the topic parameter.
> 
> What do you think?
> 
> Best,
> Mikael
> 
> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I cannot completely follow what want to achieve.
>> 
>> However, the JavaDoc for through() seems not to be correct to me. Using
>> through() will not create an extra internal changelog topic with the
>> described naming schema, because the topic specified in through() can be
>> used for this (there is no point in duplicating the data).
>> 
>> If you have a KTable and apply a mapValues(), this will not write data
>> to any topic. The derived KTable is in-memory because you can easily
>> recreate it from its base KTable.
>> 
>> What is the missing part you want to get?
>> 
>> Btw: the internally created changelog topics are only used for recovery
>> in case of failure. Streams does not consumer from those topic during
>> "normal operation".
>> 
>> 
>> -Matthias
>> 
>> 
>> 
>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
>>> Hi,
>>> 
>>> in the documentation for KTable#through, it is stated that a new
>> changelog
>>> topic will be created for the table. It also states that calling through
>> is
>>> equivalent to calling #to followed by KStreamBuilder#table.
>>> 
>>> 
>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
>>> 
>>> In the docs for KStreamBuilder#table it is stated that no new changelog
>>> topic will be created since the underlying topic acts as the changelog.
>>> I've verified that this is the case.
>>> 
>>> Is there another API method to materialize the results of a KTable
>>> including a changelog, i.e. such that kafka streams creates the topic and
>>> uses the naming schema for changelog topics? The use case I have in mind
>> is
>>> aggregate followed by mapValues.
>>> 
>>> Best,
>>> Mikael
>>> 
>> 
>> 


Re: KafkaStreams KTable#through not creating changelog topic

Posted by Mikael Högqvist <ho...@gmail.com>.
Sorry for being unclear, i'll try again :)

1) The JavaDoc for through is not correct, it states that a changelog topic
will be created for the state store. That is, if I would call it with
through("topic", "a-store"), I would expect a kafka topic
"my-app-id-a-store-changelog" to be created.

2) Regarding the use case, the topology looks like this:

.stream(...)
.aggregate(..., "store-1")
.mapValues(...)
.through(..., "store-2")

Basically, I want to materialize both the result from the aggregate method
and the result from mapValues, which is materialized using .through().
Later, I will access both the tables (store-1 and store-2) to a) get the
current state of the aggregate, b) subscribe to future updates. This works
just fine. The only issue is that I assumed to have a changelog topic for
store-2 created automatically, which didnt happen.

Since I want to access the changelog topic, it helps if the naming is
consistent. So either we enforce the same naming pattern as kafka when
calling .through() or alternatively the Kafka Streams API can provide a
method to materialize tables which creates a topic name according to the
naming pattern. E.g. .through() without the topic parameter.

What do you think?

Best,
Mikael

On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> I cannot completely follow what want to achieve.
>
> However, the JavaDoc for through() seems not to be correct to me. Using
> through() will not create an extra internal changelog topic with the
> described naming schema, because the topic specified in through() can be
> used for this (there is no point in duplicating the data).
>
> If you have a KTable and apply a mapValues(), this will not write data
> to any topic. The derived KTable is in-memory because you can easily
> recreate it from its base KTable.
>
> What is the missing part you want to get?
>
> Btw: the internally created changelog topics are only used for recovery
> in case of failure. Streams does not consumer from those topic during
> "normal operation".
>
>
> -Matthias
>
>
>
> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > Hi,
> >
> > in the documentation for KTable#through, it is stated that a new
> changelog
> > topic will be created for the table. It also states that calling through
> is
> > equivalent to calling #to followed by KStreamBuilder#table.
> >
> >
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> >
> > In the docs for KStreamBuilder#table it is stated that no new changelog
> > topic will be created since the underlying topic acts as the changelog.
> > I've verified that this is the case.
> >
> > Is there another API method to materialize the results of a KTable
> > including a changelog, i.e. such that kafka streams creates the topic and
> > uses the naming schema for changelog topics? The use case I have in mind
> is
> > aggregate followed by mapValues.
> >
> > Best,
> > Mikael
> >
>
>

Re: KafkaStreams KTable#through not creating changelog topic

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I cannot completely follow what want to achieve.

However, the JavaDoc for through() seems not to be correct to me. Using
through() will not create an extra internal changelog topic with the
described naming schema, because the topic specified in through() can be
used for this (there is no point in duplicating the data).

If you have a KTable and apply a mapValues(), this will not write data
to any topic. The derived KTable is in-memory because you can easily
recreate it from its base KTable.

What is the missing part you want to get?

Btw: the internally created changelog topics are only used for recovery
in case of failure. Streams does not consumer from those topic during
"normal operation".


-Matthias



On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> Hi,
> 
> in the documentation for KTable#through, it is stated that a new changelog
> topic will be created for the table. It also states that calling through is
> equivalent to calling #to followed by KStreamBuilder#table.
> 
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> 
> In the docs for KStreamBuilder#table it is stated that no new changelog
> topic will be created since the underlying topic acts as the changelog.
> I've verified that this is the case.
> 
> Is there another API method to materialize the results of a KTable
> including a changelog, i.e. such that kafka streams creates the topic and
> uses the naming schema for changelog topics? The use case I have in mind is
> aggregate followed by mapValues.
> 
> Best,
> Mikael
>