You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nicolas Fouché <nf...@onfocus.io> on 2017/01/18 13:47:50 UTC

Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Hi,

as far as I understand, calling `KStream.process` prevents the developer
from adding further operations to a `KStreamBuilder` [1], because its
return type is `void`. Good.

But it also prevents the developer from adding operations to its superclass
`TopologyBuilder`. In my case I wanted to add a sink, and the parent of
this sink would be the name of the Processor that is created by
`KStream.process`. Is there any reason why this method does not return the
processor name [2] ? Is it because it would be a bad idea continuing
building my topology with the low-level API ?

[1]
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java%23L56
[2]
https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e2435985d9101/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L391


Thanks.
Nicolas.

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Zac Harvey <za...@welltok.com>.
Sorry that last response was for a different thread - please ignore (and sorry!)

________________________________
From: Michael Noll <mi...@confluent.io>
Sent: Wednesday, January 18, 2017 9:56:35 AM
To: users@kafka.apache.org
Subject: Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Nicolas,

if I understand your question correctly you'd like to add further
operations after having called `KStream#process()`, which -- as you report
-- doesn't work because `process()` returns void.

If that's indeed the case, +1 to Damian's suggest to use
`KStream.transform()` instead of `KStream.process()`.

-Michael




On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com> wrote:

> You could possibly also use KStream.transform(...)
>
> On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:
>
> > Hi Nicolas,
> >
> > Good question! I'm not sure why it is a terminal operation, maybe one of
> > the original authors can chip in. However, you could probably work around
> > it by using TopologyBuilder.addProcessor(...) rather then
> KStream.process
> >
> > Thanks,
> > Damian
> >
> > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io> wrote:
> >
> > Hi,
> >
> > as far as I understand, calling `KStream.process` prevents the developer
> > from adding further operations to a `KStreamBuilder` [1], because its
> > return type is `void`. Good.
> >
> > But it also prevents the developer from adding operations to its
> superclass
> > `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> > this sink would be the name of the Processor that is created by
> > `KStream.process`. Is there any reason why this method does not return
> the
> > processor name [2] ? Is it because it would be a bad idea continuing
> > building my topology with the low-level API ?
> >
> > [1]
> >
> > https://github.com/confluentinc/examples/blob/3.
> 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> MixAndMatchLambdaIntegrationTest.java%23L56
> > [2]
> >
> > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamImpl.java#L391
> >
> >
> > Thanks.
> > Nicolas.
> >
> >
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Zac Harvey <za...@welltok.com>.
Anybody ever seen this before? Anybody have any ideas as to where I can start troubleshooting?

________________________________
From: Michael Noll <mi...@confluent.io>
Sent: Wednesday, January 18, 2017 9:56:35 AM
To: users@kafka.apache.org
Subject: Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Nicolas,

if I understand your question correctly you'd like to add further
operations after having called `KStream#process()`, which -- as you report
-- doesn't work because `process()` returns void.

If that's indeed the case, +1 to Damian's suggest to use
`KStream.transform()` instead of `KStream.process()`.

-Michael




On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com> wrote:

> You could possibly also use KStream.transform(...)
>
> On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:
>
> > Hi Nicolas,
> >
> > Good question! I'm not sure why it is a terminal operation, maybe one of
> > the original authors can chip in. However, you could probably work around
> > it by using TopologyBuilder.addProcessor(...) rather then
> KStream.process
> >
> > Thanks,
> > Damian
> >
> > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io> wrote:
> >
> > Hi,
> >
> > as far as I understand, calling `KStream.process` prevents the developer
> > from adding further operations to a `KStreamBuilder` [1], because its
> > return type is `void`. Good.
> >
> > But it also prevents the developer from adding operations to its
> superclass
> > `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> > this sink would be the name of the Processor that is created by
> > `KStream.process`. Is there any reason why this method does not return
> the
> > processor name [2] ? Is it because it would be a bad idea continuing
> > building my topology with the low-level API ?
> >
> > [1]
> >
> > https://github.com/confluentinc/examples/blob/3.
> 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> MixAndMatchLambdaIntegrationTest.java%23L56
> > [2]
> >
> > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamImpl.java#L391
> >
> >
> > Thanks.
> > Nicolas.
> >
> >
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Nicolas Fouché <nf...@onfocus.io>.
No problem with that. It's perfectly explained in
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java
.

2017-01-18 19:41 GMT+01:00 Michael Noll <mi...@confluent.io>:

> Nicolas,
>
> here's some information I shared on StackOverflow (perhaps a bit outdated
> by now, was back in Aug 2016) about how you can add a state store when
> using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580
>
> -Michael
>
>
>
>
> On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fouché <nf...@onfocus.io>
> wrote:
>
> > The reason I would not use `KStream.transform()` is that I want to call
> > `ProcessorContext.forward()` several times, to different children. These
> > children are sinks.
> > My use case: I need to route my beacons to different topics. Right now, I
> > use a series of `KStream.branch()` calls [1]. But would it be more
> > "elegant" to be able to add 5 sinks to a topology, and forward my records
> > to them in a custom processor ?
> >
> > Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> > have
> > to give a parent processor. But the parent processor was generated by a
> > high-level topologies. And names of processors created by
> `KStreamBuilder`
> > are not accessible. (unless by inspecting the topology nodes I guess)
> >
> > [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
> >
> > Thanks.
> > Nicolas
> >
> >
> > 2017-01-18 15:56 GMT+01:00 Michael Noll <mi...@confluent.io>:
> >
> > > Nicolas,
> > >
> > > if I understand your question correctly you'd like to add further
> > > operations after having called `KStream#process()`, which -- as you
> > report
> > > -- doesn't work because `process()` returns void.
> > >
> > > If that's indeed the case, +1 to Damian's suggest to use
> > > `KStream.transform()` instead of `KStream.process()`.
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > > > You could possibly also use KStream.transform(...)
> > > >
> > > > On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com>
> wrote:
> > > >
> > > > > Hi Nicolas,
> > > > >
> > > > > Good question! I'm not sure why it is a terminal operation, maybe
> one
> > > of
> > > > > the original authors can chip in. However, you could probably work
> > > around
> > > > > it by using TopologyBuilder.addProcessor(...) rather then
> > > > KStream.process
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io>
> > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > as far as I understand, calling `KStream.process` prevents the
> > > developer
> > > > > from adding further operations to a `KStreamBuilder` [1], because
> its
> > > > > return type is `void`. Good.
> > > > >
> > > > > But it also prevents the developer from adding operations to its
> > > > superclass
> > > > > `TopologyBuilder`. In my case I wanted to add a sink, and the
> parent
> > of
> > > > > this sink would be the name of the Processor that is created by
> > > > > `KStream.process`. Is there any reason why this method does not
> > return
> > > > the
> > > > > processor name [2] ? Is it because it would be a bad idea
> continuing
> > > > > building my topology with the low-level API ?
> > > > >
> > > > > [1]
> > > > >
> > > > > https://github.com/confluentinc/examples/blob/3.
> > > > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > > MixAndMatchLambdaIntegrationTest.java%23L56
> > > > > [2]
> > > > >
> > > > > https://github.com/apache/kafka/blob/
> b6011918fbc36bfaa465bdcc750e24
> > > > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> > > > kstream/internals/KStreamImpl.java#L391
> > > > >
> > > > >
> > > > > Thanks.
> > > > > Nicolas.
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Michael Noll <mi...@confluent.io>.
Nicolas,

here's some information I shared on StackOverflow (perhaps a bit outdated
by now, was back in Aug 2016) about how you can add a state store when
using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580

-Michael




On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fouché <nf...@onfocus.io> wrote:

> The reason I would not use `KStream.transform()` is that I want to call
> `ProcessorContext.forward()` several times, to different children. These
> children are sinks.
> My use case: I need to route my beacons to different topics. Right now, I
> use a series of `KStream.branch()` calls [1]. But would it be more
> "elegant" to be able to add 5 sinks to a topology, and forward my records
> to them in a custom processor ?
>
> Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> have
> to give a parent processor. But the parent processor was generated by a
> high-level topologies. And names of processors created by `KStreamBuilder`
> are not accessible. (unless by inspecting the topology nodes I guess)
>
> [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
>
> Thanks.
> Nicolas
>
>
> 2017-01-18 15:56 GMT+01:00 Michael Noll <mi...@confluent.io>:
>
> > Nicolas,
> >
> > if I understand your question correctly you'd like to add further
> > operations after having called `KStream#process()`, which -- as you
> report
> > -- doesn't work because `process()` returns void.
> >
> > If that's indeed the case, +1 to Damian's suggest to use
> > `KStream.transform()` instead of `KStream.process()`.
> >
> > -Michael
> >
> >
> >
> >
> > On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > You could possibly also use KStream.transform(...)
> > >
> > > On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:
> > >
> > > > Hi Nicolas,
> > > >
> > > > Good question! I'm not sure why it is a terminal operation, maybe one
> > of
> > > > the original authors can chip in. However, you could probably work
> > around
> > > > it by using TopologyBuilder.addProcessor(...) rather then
> > > KStream.process
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io>
> > wrote:
> > > >
> > > > Hi,
> > > >
> > > > as far as I understand, calling `KStream.process` prevents the
> > developer
> > > > from adding further operations to a `KStreamBuilder` [1], because its
> > > > return type is `void`. Good.
> > > >
> > > > But it also prevents the developer from adding operations to its
> > > superclass
> > > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent
> of
> > > > this sink would be the name of the Processor that is created by
> > > > `KStream.process`. Is there any reason why this method does not
> return
> > > the
> > > > processor name [2] ? Is it because it would be a bad idea continuing
> > > > building my topology with the low-level API ?
> > > >
> > > > [1]
> > > >
> > > > https://github.com/confluentinc/examples/blob/3.
> > > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > MixAndMatchLambdaIntegrationTest.java%23L56
> > > > [2]
> > > >
> > > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> > > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> > > kstream/internals/KStreamImpl.java#L391
> > > >
> > > >
> > > > Thanks.
> > > > Nicolas.
> > > >
> > > >
> > >
> >
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Nicolas Fouché <nf...@onfocus.io>.
Ho my, I'm dumb. One can give multiple predicates to `KStream.branch()`.

2017-01-18 17:18 GMT+01:00 Nicolas Fouché <nf...@onfocus.io>:

> The reason I would not use `KStream.transform()` is that I want to call
> `ProcessorContext.forward()` several times, to different children. These
> children are sinks.
> My use case: I need to route my beacons to different topics. Right now, I
> use a series of `KStream.branch()` calls [1]. But would it be more
> "elegant" to be able to add 5 sinks to a topology, and forward my records
> to them in a custom processor ?
>
> Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> have to give a parent processor. But the parent processor was generated by
> a high-level topologies. And names of processors created by
> `KStreamBuilder` are not accessible. (unless by inspecting the topology
> nodes I guess)
>
> [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
>
> Thanks.
> Nicolas
>
>
> 2017-01-18 15:56 GMT+01:00 Michael Noll <mi...@confluent.io>:
>
>> Nicolas,
>>
>> if I understand your question correctly you'd like to add further
>> operations after having called `KStream#process()`, which -- as you report
>> -- doesn't work because `process()` returns void.
>>
>> If that's indeed the case, +1 to Damian's suggest to use
>> `KStream.transform()` instead of `KStream.process()`.
>>
>> -Michael
>>
>>
>>
>>
>> On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com> wrote:
>>
>> > You could possibly also use KStream.transform(...)
>> >
>> > On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:
>> >
>> > > Hi Nicolas,
>> > >
>> > > Good question! I'm not sure why it is a terminal operation, maybe one
>> of
>> > > the original authors can chip in. However, you could probably work
>> around
>> > > it by using TopologyBuilder.addProcessor(...) rather then
>> > KStream.process
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io>
>> wrote:
>> > >
>> > > Hi,
>> > >
>> > > as far as I understand, calling `KStream.process` prevents the
>> developer
>> > > from adding further operations to a `KStreamBuilder` [1], because its
>> > > return type is `void`. Good.
>> > >
>> > > But it also prevents the developer from adding operations to its
>> > superclass
>> > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent
>> of
>> > > this sink would be the name of the Processor that is created by
>> > > `KStream.process`. Is there any reason why this method does not return
>> > the
>> > > processor name [2] ? Is it because it would be a bad idea continuing
>> > > building my topology with the low-level API ?
>> > >
>> > > [1]
>> > >
>> > > https://github.com/confluentinc/examples/blob/3.
>> > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
>> > MixAndMatchLambdaIntegrationTest.java%23L56
>> > > [2]
>> > >
>> > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
>> > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
>> > kstream/internals/KStreamImpl.java#L391
>> > >
>> > >
>> > > Thanks.
>> > > Nicolas.
>> > >
>> > >
>> >
>>
>
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Nicolas Fouché <nf...@onfocus.io>.
The reason I would not use `KStream.transform()` is that I want to call
`ProcessorContext.forward()` several times, to different children. These
children are sinks.
My use case: I need to route my beacons to different topics. Right now, I
use a series of `KStream.branch()` calls [1]. But would it be more
"elegant" to be able to add 5 sinks to a topology, and forward my records
to them in a custom processor ?

Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I have
to give a parent processor. But the parent processor was generated by a
high-level topologies. And names of processors created by `KStreamBuilder`
are not accessible. (unless by inspecting the topology nodes I guess)

[1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352

Thanks.
Nicolas


2017-01-18 15:56 GMT+01:00 Michael Noll <mi...@confluent.io>:

> Nicolas,
>
> if I understand your question correctly you'd like to add further
> operations after having called `KStream#process()`, which -- as you report
> -- doesn't work because `process()` returns void.
>
> If that's indeed the case, +1 to Damian's suggest to use
> `KStream.transform()` instead of `KStream.process()`.
>
> -Michael
>
>
>
>
> On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com> wrote:
>
> > You could possibly also use KStream.transform(...)
> >
> > On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:
> >
> > > Hi Nicolas,
> > >
> > > Good question! I'm not sure why it is a terminal operation, maybe one
> of
> > > the original authors can chip in. However, you could probably work
> around
> > > it by using TopologyBuilder.addProcessor(...) rather then
> > KStream.process
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io>
> wrote:
> > >
> > > Hi,
> > >
> > > as far as I understand, calling `KStream.process` prevents the
> developer
> > > from adding further operations to a `KStreamBuilder` [1], because its
> > > return type is `void`. Good.
> > >
> > > But it also prevents the developer from adding operations to its
> > superclass
> > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> > > this sink would be the name of the Processor that is created by
> > > `KStream.process`. Is there any reason why this method does not return
> > the
> > > processor name [2] ? Is it because it would be a bad idea continuing
> > > building my topology with the low-level API ?
> > >
> > > [1]
> > >
> > > https://github.com/confluentinc/examples/blob/3.
> > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > MixAndMatchLambdaIntegrationTest.java%23L56
> > > [2]
> > >
> > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> > kstream/internals/KStreamImpl.java#L391
> > >
> > >
> > > Thanks.
> > > Nicolas.
> > >
> > >
> >
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Michael Noll <mi...@confluent.io>.
Nicolas,

if I understand your question correctly you'd like to add further
operations after having called `KStream#process()`, which -- as you report
-- doesn't work because `process()` returns void.

If that's indeed the case, +1 to Damian's suggest to use
`KStream.transform()` instead of `KStream.process()`.

-Michael




On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <da...@gmail.com> wrote:

> You could possibly also use KStream.transform(...)
>
> On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:
>
> > Hi Nicolas,
> >
> > Good question! I'm not sure why it is a terminal operation, maybe one of
> > the original authors can chip in. However, you could probably work around
> > it by using TopologyBuilder.addProcessor(...) rather then
> KStream.process
> >
> > Thanks,
> > Damian
> >
> > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io> wrote:
> >
> > Hi,
> >
> > as far as I understand, calling `KStream.process` prevents the developer
> > from adding further operations to a `KStreamBuilder` [1], because its
> > return type is `void`. Good.
> >
> > But it also prevents the developer from adding operations to its
> superclass
> > `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> > this sink would be the name of the Processor that is created by
> > `KStream.process`. Is there any reason why this method does not return
> the
> > processor name [2] ? Is it because it would be a bad idea continuing
> > building my topology with the low-level API ?
> >
> > [1]
> >
> > https://github.com/confluentinc/examples/blob/3.
> 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> MixAndMatchLambdaIntegrationTest.java%23L56
> > [2]
> >
> > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamImpl.java#L391
> >
> >
> > Thanks.
> > Nicolas.
> >
> >
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Damian Guy <da...@gmail.com>.
You could possibly also use KStream.transform(...)

On Wed, 18 Jan 2017 at 14:22 Damian Guy <da...@gmail.com> wrote:

> Hi Nicolas,
>
> Good question! I'm not sure why it is a terminal operation, maybe one of
> the original authors can chip in. However, you could probably work around
> it by using TopologyBuilder.addProcessor(...) rather then KStream.process
>
> Thanks,
> Damian
>
> On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io> wrote:
>
> Hi,
>
> as far as I understand, calling `KStream.process` prevents the developer
> from adding further operations to a `KStreamBuilder` [1], because its
> return type is `void`. Good.
>
> But it also prevents the developer from adding operations to its superclass
> `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> this sink would be the name of the Processor that is created by
> `KStream.process`. Is there any reason why this method does not return the
> processor name [2] ? Is it because it would be a bad idea continuing
> building my topology with the low-level API ?
>
> [1]
>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java%23L56
> [2]
>
> https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e2435985d9101/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L391
>
>
> Thanks.
> Nicolas.
>
>

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

Posted by Damian Guy <da...@gmail.com>.
Hi Nicolas,

Good question! I'm not sure why it is a terminal operation, maybe one of
the original authors can chip in. However, you could probably work around
it by using TopologyBuilder.addProcessor(...) rather then KStream.process

Thanks,
Damian

On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nf...@onfocus.io> wrote:

> Hi,
>
> as far as I understand, calling `KStream.process` prevents the developer
> from adding further operations to a `KStreamBuilder` [1], because its
> return type is `void`. Good.
>
> But it also prevents the developer from adding operations to its superclass
> `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> this sink would be the name of the Processor that is created by
> `KStream.process`. Is there any reason why this method does not return the
> processor name [2] ? Is it because it would be a bad idea continuing
> building my topology with the low-level API ?
>
> [1]
>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java%23L56
> [2]
>
> https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e2435985d9101/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L391
>
>
> Thanks.
> Nicolas.
>