You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Srikanth <sr...@gmail.com> on 2016/07/17 21:56:44 UTC

Chaining custom processors with DSL

Hello,

Using the low level API its possible to chain processors like this

  builder.addSource("Source", ...);
  builder.addProcessor("Process1", new MyProcessorSupplier(), "Source");
  builder.addProcessor("Process2", new MyProcessorSupplier2(), "Process1");

Can I do something similar with DSL?

  myStream.map( ...)
                 .process(new MyProcesserSupplier(), "StoreName")
                 .map(...).filter(...)
                 .process(new MyProcesserSupplier2(), "StoreName")
                 .to(topicName)

Each process() will do some transformation and do context.forward(...)

Srikanth

Re: Chaining custom processors with DSL

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sure.

You can use process(), transform(), or transformValues() on a KStream
for general UDFs.

See
http://docs.confluent.io/3.0.0/streams/developer-guide.html#stateful-transformations

-Matthias


On 07/17/2016 11:56 PM, Srikanth wrote:
> Hello,
> 
> Using the low level API its possible to chain processors like this
> 
>   builder.addSource("Source", ...);
>   builder.addProcessor("Process1", new MyProcessorSupplier(), "Source");
>   builder.addProcessor("Process2", new MyProcessorSupplier2(), "Process1");
> 
> Can I do something similar with DSL?
> 
>   myStream.map( ...)
>                  .process(new MyProcesserSupplier(), "StoreName")
>                  .map(...).filter(...)
>                  .process(new MyProcesserSupplier2(), "StoreName")
>                  .to(topicName)
> 
> Each process() will do some transformation and do context.forward(...)
> 
> Srikanth
>