You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Srikanth <sr...@gmail.com> on 2016/07/17 21:56:44 UTC
Chaining custom processors with DSL
Hello,
Using the low level API its possible to chain processors like this
builder.addSource("Source", ...);
builder.addProcessor("Process1", new MyProcessorSupplier(), "Source");
builder.addProcessor("Process2", new MyProcessorSupplier2(), "Process1");
Can I do something similar with DSL?
myStream.map( ...)
.process(new MyProcesserSupplier(), "StoreName")
.map(...).filter(...)
.process(new MyProcesserSupplier2(), "StoreName")
.to(topicName)
Each process() will do some transformation and do context.forward(...)
Srikanth
Re: Chaining custom processors with DSL
Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sure.
You can use process(), transform(), or transformValues() on a KStream
for general UDFs.
See
http://docs.confluent.io/3.0.0/streams/developer-guide.html#stateful-transformations
-Matthias
On 07/17/2016 11:56 PM, Srikanth wrote:
> Hello,
>
> Using the low level API its possible to chain processors like this
>
> builder.addSource("Source", ...);
> builder.addProcessor("Process1", new MyProcessorSupplier(), "Source");
> builder.addProcessor("Process2", new MyProcessorSupplier2(), "Process1");
>
> Can I do something similar with DSL?
>
> myStream.map( ...)
> .process(new MyProcesserSupplier(), "StoreName")
> .map(...).filter(...)
> .process(new MyProcesserSupplier2(), "StoreName")
> .to(topicName)
>
> Each process() will do some transformation and do context.forward(...)
>
> Srikanth
>