You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yuto KAWAMURA <ka...@gmail.com> on 2016/04/01 10:20:05 UTC

Kafka Streams: context.forward() with downstream name

When I tried to implement a task which does kinda dispatching to
downstream processors or sinks, looks like relying on
context.forward(K, V, int childIndex) is the only way now.
I have a question why this method implemented using childIndex(which
is just an index of children "List" that based on order of
builder.addProcessor() call) instead of child name(first argument to
add{Processor,Sink}).
I wanna ask what is the concrete use case of forward(K, V, int
childIndex) and is it makes sense to introduce another overload:
forward(K, V, String childName) for much handy use.
Currently I have a use-case like this in my mind:
```
builder.addProcessor("DispatchProcess", new
DispatchProcessorSupplier(), "Source");
builder.addProcessor("Process-A", new ProcessorASupplier(), "DispatchProcess");
builder.addProcessor("Process-B", new ProcessorBSupplier(), "DispatchProcess");

// in process(key, value)
if ("key-for-A".equals(key)) {
    context.forward(key, value, "Process-A");
} else if ("key-for-B".equals(key)) {
    context.forward(key, value, "Process-B");
}
```

Re: Kafka Streams: context.forward() with downstream name

Posted by josh gruenberg <jo...@gmail.com>.
Yes, sounds good, Guozhang, thanks. I'll create a jira today.

-josh

On Thu, Apr 14, 2016, 1:37 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Josh,
>
> As we chatted offline, would you like to summarize your proposed Transform
> APIs in a separate JIRA so we can continue our discussions there?
>
> Guozhang
>
> On Tue, Apr 5, 2016 at 4:13 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > HI Josh,
> >
> > Re 1): transform is only for usage in the higher-level DSL, while in the
> > lower-level APIs people are expected to work with Processor only, which
> for
> > now use context.forward() to send record to the downstream processors.
> >
> > Re 2): I have a few questions for your propose: with different typed
> > key-value pairs, are they supposed to be forwarding to different children
> > processors, i.e. you'd better call "forward(K, V, String /* child name*/
> )"
> > rather than "forward(K, V)" which will forward to all the children
> > processors, one by one. In this case, the topology builder also needs to
> be
> > refactored since we need to make sure the children processors exist when
> > the processors are defined, which could be a bit tricky in
> implementation.
> >
> > Also, is this change also aimed at making the lower-level Processor to be
> > type-safe as well? The main motivation for not supporting strong typed
> > Processors is mainly for allowing users to flexibly connect processors in
> > the topology without worrying about data types of each processor. As I
> > mentioned above, making specific forwards to downstream processors would
> > likely to defeat this purpose.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <jo...@gmail.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> I'll reply to your points in-line below:
> >>
> >> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >> > Hi Josh,
> >> >
> >> > I think there are a few issues that we want to resolve here, which
> >> could be
> >> > orthogonal to each other.
> >> >
> >> > 1) one-to-many mapping in transform() function that generates a single
> >> > stream (i.e. single typed key-value pairs).
> >> >
> >> > Since transform() already enforces to make type-safe return values,
> one
> >> > thing we can do is to change the punctuate() function return value
> from
> >> > "null" to "R" as well. And then for one-to-many mapping one can then
> >> define
> >> > R as Array<MyType>
> >> >
> >> > stream.transform().flatMap(/* from Array<MyType> to MyType*/)
> >> >
> >> >
> >> Interesting, it hadn't thought of returning an Iterable from a
> Transformer
> >> to achieve the one-to-many case. Regardless, my initial reaction is that
> >> this seems natural when you're working in the declarative DSL (where we
> >> can
> >> move the flatMap to the left of the one-to-many lambda), but seems
> >> cumbersome when writing a lower-level imperative Transformer/Processor
> >> (which then requires a subsequent flatMap to explode the arrays). I
> think
> >> I'd prefer a more imperative, side-effecting style of emission in this
> >> case; see my reply to #2 below.
> >>
> >>
> >> > 2) having a new function that takes one stream, and generate multiple
> >> > streams with different key-value types.
> >> >
> >> > This is a good-to-have operator in the Streams DSL, and I think this
> is
> >> > your proposed new API in the previous email? I am not sure I
> >> > understand the "telescoping"
> >> > arity completely though, so let me know if I'm wrong.
> >> >
> >>
> >> Yes, sorry, I wrote my previous mail in a hurry this morning. By
> >> "telescoping", I meant defining processor/transformer interfaces for
> each
> >> supported output arity, with the corresponding output-types expressed as
> >> generics:
> >>
> >>    - Processor1<K, V, K1, V1>
> >>    - Processor2<K, V, K1, V1, K2, V2>
> >>    - Processor3<K, V, K1, V1, K2, V2, K3, V3>
> >>
> >> (Yes, the number of generics here is getting unwieldy, but I don't
> >> immediately see a good way to avoid that while preserving type-safety.
> >> Some
> >> sort of cosmetic improvement would be nice!)
> >>
> >> Given this, the framework could inject type-safe "emitters" into the
> >> Processors for each output-stream:
> >>
> >> interface Processor2<K, V, K1, V1, K2, V2> {
> >>   void init(ProcessorContext context, Forwarder<K1, V1> output1,
> >> Forwarder<K2, V2> output2);
> >>
> >>   // these can use the forwarders provided to init() to emit any number
> of
> >> values
> >>   void process(I input);
> >>   void punctuate(long timestamp);
> >>   // ...
> >> }
> >>
> >> interface Forwarder<K, V> {
> >>   void forward(K key, V value);
> >> }
> >>
> >> ... then, in KStream<K,V>:
> >>
> >> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V,
> >> K1, V1, K2, V2> processor, String... stateStores);
> >>
> >> I haven't worked through all of the details, but I'm optimistic that
> this
> >> could work nicely to unify the Transformer and Processor APIs, and
> address
> >> all of the described use-cases (up to some arbitrarily-chosen number of
> >> supported output-streams).
> >>
> >>
> >> > 3) having data-driven emission policy (this will be the building block
> >> of
> >> > session windows) as well as time-drive emission policy.
> >> >
> >> > I am thinking about how to support this as well, one thing is that we
> >> can
> >> > use the underlying process() function for data-driven emission, for
> >> > example, if there is a session-start / end flag then create the
> >> > corresponding session record in state, and only emit upon session-end
> >> flag;
> >> > and the underlying punctuate() function for time-drive emission (we
> >> > probably need to first refactor it to be triggered by record timestamp
> >> > instead of wallclock time).
> >> >
> >>
> >> Yes, I agree: data-driven emission could work just fine with process(),
> >> and
> >> delayed emission works nicely with punctuate(). I've also been meaning
> to
> >> mention the clear need for event-time punctuation, so I'm glad to hear
> >> that's on your radar! Watermarking will be important for
> >> session-windowing.
> >>
> >> Thoughts?
> >>
> >> -josh
> >>
> >>
> >> >
> >> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jo...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Just chiming in with Yuto: I think the custom Processor becomes
> >> > attractive
> >> > > in scenarios where a node in the graph may emit to a variety of
> >> > downstream
> >> > > paths, possibly after some delay, depending on logic. This can
> >> probably
> >> > > often be achieved with the existing DSL using some combination of
> >> > > predicates and intermediate representations, but this involves
> >> > contortions
> >> > > that feel cumbersome, and probably leads to less intelligible code.
> >> I'm
> >> > > also not sure the current DSL can model scenarios where the
> >> > transformation
> >> > > may be one-to-many, as in the last part of Yuto's example, or where
> >> the
> >> > > emission-delay is data-driven, as in my earlier "sessionization"
> >> example.
> >> > >
> >> > > One idea I'd offer is to provide a mechanism for wiring in
> Processors
> >> > with
> >> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I,
> O1,
> >> > O2>,
> >> > > etc), and providing each arity with type-safe forwarding interfaces
> >> for
> >> > > each output stream (eg, Forwarder<T>). This assigns each
> >> output-stream a
> >> > > clear ordinal, and suggests a corresponding type-safe return-type
> for
> >> the
> >> > > DSL (eg, KStreamTuple2<O1, O2>).
> >> > >
> >> > > I think this pattern could provide a unification of the
> 'Transformer'
> >> and
> >> > > 'Processor' APIs.
> >> > > This was what I had in mind for a PR we discussed earlier (for
> >> modifying
> >> > > the Transformer API), but the scope expanded beyond what I felt
> >> > comfortable
> >> > > submitting without discussion, and I had to prioritize other
> efforts.
> >> > > Regardless, I could get a WIP branch pushed to github later today to
> >> > > illustrate if you'd like to see it.
> >> > >
> >> > > HTH,
> >> > > -josh
> >> > >
> >> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Thanks Yuto for your code snippet. Since you need to access a
> >> > customized
> >> > > > external storage for metadata, that indeed cannot be wrapped in
> any
> >> > > > built-in operators in the Streams DSL yet, and your code example
> in
> >> the
> >> > > > previous email would be close to the best you can do with the
> >> > high-level
> >> > > > DSL now.
> >> > > >
> >> > > > One minor improvement from your above code, though, is that
> instead
> >> of
> >> > > > calling map(... -> process()) you can actually call transform(),
> >> which
> >> > > > still allows you to provide a customized transformer function, but
> >> it
> >> > > still
> >> > > > gives you strong typing assuming all these three kinds of records
> >> are
> >> > of
> >> > > > the same key / value types.
> >> > > >
> >> > > > Guozhang
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> >> > > kawamuray.dadada@gmail.com
> >> > > > >
> >> > > > wrote:
> >> > > >
> >> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> >> > > > > > Hi Yuto,
> >> > > > > >
> >> > > > > > Is the destination topic embedded as part of the value in the
> >> > > original
> >> > > > > > "foo" topic? If yes could you just access that field directly
> >> > instead
> >> > > > of
> >> > > > > > mapping to a (key, value, destination) triplet?
> >> > > > > >
> >> > > > >
> >> > > > > Nope. KeyValueWithDestination is just an example of output from
> >> the
> >> > > > > first Processor and is not included in actual messages that the
> >> topic
> >> > > > > foo received.
> >> > > > > Let me explain bit more realistic use-case. How can we write a
> >> > > > > Processor like below in High-level DSL cleanly?
> >> > > > >
> >> > > > > ```java
> >> > > > > public class EventProcessor implements Processor<String, Event>
> {
> >> > > > > ...
> >> > > > >   @Override
> >> > > > >   public void process(String key, Event value) {
> >> > > > >       EventMetadata meta =
> >> > > > > getEventMetadataFromExternalStorage(value.getId());
> >> > > > >
> >> > > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> >> > > > >           // This event is corrupted! let's evacuate it once to
> >> the
> >> > > > > grave topic for further investigation.
> >> > > > >           context.forward(key, value, "CorruptedEventSink");
> >> > > > >       }
> >> > > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> >> > > > >           // Antoher case of corruption, but maybe recoverable.
> >> > > > >           context.forward(key, value,
> >> > > "CorruptedEventRecoveryProcessor");
> >> > > > >       }
> >> > > > >
> >> > > > >       for (Foo foo : event.getFoos()) {
> >> > > > >           context.forward(key, buildMessage(meta, foo),
> >> > > "FooProcessor");
> >> > > > >       }
> >> > > > >   }
> >> > > > > ...
> >> > > > > }
> >> > > > > ```
> >> > > > >
> >> > > > >
> >> > > > > > Guozhang
> >> > > > > >
> >> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> >> > > > > kawamuray.dadada@gmail.com>
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > >> Hi Guozhang,
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangguoz@gmail.com
> >:
> >> > > > > >> > Hi Yuto,
> >> > > > > >> >
> >> > > > > >> > That is a good suggestion, the child index is not very
> >> intuitive
> >> > > > from
> >> > > > > >> > programmer's view and we can even consider replacing it
> with
> >> the
> >> > > > > >> processor
> >> > > > > >> > name instead of overloading it. Could you file a JIRA?
> >> > > > > >> >
> >> > > > > >>
> >> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> >> > > > > >>
> >> > > > > >> > Also I am wondering if you have looked at the higher-level
> >> > Streams
> >> > > > > DSL,
> >> > > > > >> and
> >> > > > > >> > if yes could let me know what are the limitations from
> using
> >> > that
> >> > > > > APIs in
> >> > > > > >> > your case?
> >> > > > > >> >
> >> > > > > >>
> >> > > > > >> Well, I read though high-level DSL interface but couldn't
> find
> >> an
> >> > > easy
> >> > > > > >> way to handle output from Processors which could issue
> multiple
> >> > > > > >> messages to arbitrary different destinations.
> >> > > > > >> Maybe it could be done by doing something like below but it
> >> > doesn't
> >> > > > > >> look good. Please let me know if you have any idea to do this
> >> in
> >> > > > > >> easier way.
> >> > > > > >>
> >> > > > > >> ```java
> >> > > > > >> class KeyValueWithDestination {
> >> > > > > >>     K key;
> >> > > > > >>     V value;
> >> > > > > >>     String destination;
> >> > > > > >> }
> >> > > > > >>
> >> > > > > >> class DestinationPredicate implements Predicate<K,
> >> > > > > >> KeyValueWithDestination> {
> >> > > > > >>     String destination;
> >> > > > > >>     @Override
> >> > > > > >>     public boolean test(K key, KeyValueWithDestination
> value) {
> >> > > > > >>         return value.destination.equals(destination);
> >> > > > > >>     }
> >> > > > > >> }
> >> > > > > >>
> >> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> >> > > > > >>
> >> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> >> > > > > >>
> >>  Arrays.stream(destTopics).map(DestinationPredicate::new)
> >> > > > > >>                                  .toArray(Predicate<K,
> >> > > > > >> KeyValueWithDestination>::new);
> >> > > > > >>
> >> > > > > >> branches = builder.stream("foo")
> >> > > > > >>                   .map((key, value) -> processor.process(key,
> >> > value)
> >> > > > > >> /* => KeyValueWithDestination */)
> >> > > > > >>                   .branch(predicates);
> >> > > > > >>
> >> > > > > >> for (int i = 0; i < branches.length; i++) {
> >> > > > > >>     branches[i].to(destTopics[i]);
> >> > > > > >> }
> >> > > > > >> ```
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> > Guozhang
> >> > > > > >> >
> >> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> >> > > > > >> kawamuray.dadada@gmail.com>
> >> > > > > >> > wrote:
> >> > > > > >> >
> >> > > > > >> >> When I tried to implement a task which does kinda
> >> dispatching
> >> > to
> >> > > > > >> >> downstream processors or sinks, looks like relying on
> >> > > > > >> >> context.forward(K, V, int childIndex) is the only way now.
> >> > > > > >> >> I have a question why this method implemented using
> >> > > > childIndex(which
> >> > > > > >> >> is just an index of children "List" that based on order of
> >> > > > > >> >> builder.addProcessor() call) instead of child name(first
> >> > argument
> >> > > > to
> >> > > > > >> >> add{Processor,Sink}).
> >> > > > > >> >> I wanna ask what is the concrete use case of forward(K, V,
> >> int
> >> > > > > >> >> childIndex) and is it makes sense to introduce another
> >> > overload:
> >> > > > > >> >> forward(K, V, String childName) for much handy use.
> >> > > > > >> >> Currently I have a use-case like this in my mind:
> >> > > > > >> >> ```
> >> > > > > >> >> builder.addProcessor("DispatchProcess", new
> >> > > > > >> >> DispatchProcessorSupplier(), "Source");
> >> > > > > >> >> builder.addProcessor("Process-A", new
> ProcessorASupplier(),
> >> > > > > >> >> "DispatchProcess");
> >> > > > > >> >> builder.addProcessor("Process-B", new
> ProcessorBSupplier(),
> >> > > > > >> >> "DispatchProcess");
> >> > > > > >> >>
> >> > > > > >> >> // in process(key, value)
> >> > > > > >> >> if ("key-for-A".equals(key)) {
> >> > > > > >> >>     context.forward(key, value, "Process-A");
> >> > > > > >> >> } else if ("key-for-B".equals(key)) {
> >> > > > > >> >>     context.forward(key, value, "Process-B");
> >> > > > > >> >> }
> >> > > > > >> >> ```
> >> > > > > >> >>
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> > --
> >> > > > > >> > -- Guozhang
> >> > > > > >>
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -- Guozhang
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams: context.forward() with downstream name

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Josh,

As we chatted offline, would you like to summarize your proposed Transform
APIs in a separate JIRA so we can continue our discussions there?

Guozhang

On Tue, Apr 5, 2016 at 4:13 PM, Guozhang Wang <wa...@gmail.com> wrote:

> HI Josh,
>
> Re 1): transform is only for usage in the higher-level DSL, while in the
> lower-level APIs people are expected to work with Processor only, which for
> now use context.forward() to send record to the downstream processors.
>
> Re 2): I have a few questions for your propose: with different typed
> key-value pairs, are they supposed to be forwarding to different children
> processors, i.e. you'd better call "forward(K, V, String /* child name*/ )"
> rather than "forward(K, V)" which will forward to all the children
> processors, one by one. In this case, the topology builder also needs to be
> refactored since we need to make sure the children processors exist when
> the processors are defined, which could be a bit tricky in implementation.
>
> Also, is this change also aimed at making the lower-level Processor to be
> type-safe as well? The main motivation for not supporting strong typed
> Processors is mainly for allowing users to flexibly connect processors in
> the topology without worrying about data types of each processor. As I
> mentioned above, making specific forwards to downstream processors would
> likely to defeat this purpose.
>
>
> Guozhang
>
>
> On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <jo...@gmail.com> wrote:
>
>> Hi Guozhang,
>>
>> I'll reply to your points in-line below:
>>
>> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wa...@gmail.com> wrote:
>>
>> > Hi Josh,
>> >
>> > I think there are a few issues that we want to resolve here, which
>> could be
>> > orthogonal to each other.
>> >
>> > 1) one-to-many mapping in transform() function that generates a single
>> > stream (i.e. single typed key-value pairs).
>> >
>> > Since transform() already enforces to make type-safe return values, one
>> > thing we can do is to change the punctuate() function return value from
>> > "null" to "R" as well. And then for one-to-many mapping one can then
>> define
>> > R as Array<MyType>
>> >
>> > stream.transform().flatMap(/* from Array<MyType> to MyType*/)
>> >
>> >
>> Interesting, it hadn't thought of returning an Iterable from a Transformer
>> to achieve the one-to-many case. Regardless, my initial reaction is that
>> this seems natural when you're working in the declarative DSL (where we
>> can
>> move the flatMap to the left of the one-to-many lambda), but seems
>> cumbersome when writing a lower-level imperative Transformer/Processor
>> (which then requires a subsequent flatMap to explode the arrays). I think
>> I'd prefer a more imperative, side-effecting style of emission in this
>> case; see my reply to #2 below.
>>
>>
>> > 2) having a new function that takes one stream, and generate multiple
>> > streams with different key-value types.
>> >
>> > This is a good-to-have operator in the Streams DSL, and I think this is
>> > your proposed new API in the previous email? I am not sure I
>> > understand the "telescoping"
>> > arity completely though, so let me know if I'm wrong.
>> >
>>
>> Yes, sorry, I wrote my previous mail in a hurry this morning. By
>> "telescoping", I meant defining processor/transformer interfaces for each
>> supported output arity, with the corresponding output-types expressed as
>> generics:
>>
>>    - Processor1<K, V, K1, V1>
>>    - Processor2<K, V, K1, V1, K2, V2>
>>    - Processor3<K, V, K1, V1, K2, V2, K3, V3>
>>
>> (Yes, the number of generics here is getting unwieldy, but I don't
>> immediately see a good way to avoid that while preserving type-safety.
>> Some
>> sort of cosmetic improvement would be nice!)
>>
>> Given this, the framework could inject type-safe "emitters" into the
>> Processors for each output-stream:
>>
>> interface Processor2<K, V, K1, V1, K2, V2> {
>>   void init(ProcessorContext context, Forwarder<K1, V1> output1,
>> Forwarder<K2, V2> output2);
>>
>>   // these can use the forwarders provided to init() to emit any number of
>> values
>>   void process(I input);
>>   void punctuate(long timestamp);
>>   // ...
>> }
>>
>> interface Forwarder<K, V> {
>>   void forward(K key, V value);
>> }
>>
>> ... then, in KStream<K,V>:
>>
>> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V,
>> K1, V1, K2, V2> processor, String... stateStores);
>>
>> I haven't worked through all of the details, but I'm optimistic that this
>> could work nicely to unify the Transformer and Processor APIs, and address
>> all of the described use-cases (up to some arbitrarily-chosen number of
>> supported output-streams).
>>
>>
>> > 3) having data-driven emission policy (this will be the building block
>> of
>> > session windows) as well as time-drive emission policy.
>> >
>> > I am thinking about how to support this as well, one thing is that we
>> can
>> > use the underlying process() function for data-driven emission, for
>> > example, if there is a session-start / end flag then create the
>> > corresponding session record in state, and only emit upon session-end
>> flag;
>> > and the underlying punctuate() function for time-drive emission (we
>> > probably need to first refactor it to be triggered by record timestamp
>> > instead of wallclock time).
>> >
>>
>> Yes, I agree: data-driven emission could work just fine with process(),
>> and
>> delayed emission works nicely with punctuate(). I've also been meaning to
>> mention the clear need for event-time punctuation, so I'm glad to hear
>> that's on your radar! Watermarking will be important for
>> session-windowing.
>>
>> Thoughts?
>>
>> -josh
>>
>>
>> >
>> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jo...@gmail.com>
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > > Just chiming in with Yuto: I think the custom Processor becomes
>> > attractive
>> > > in scenarios where a node in the graph may emit to a variety of
>> > downstream
>> > > paths, possibly after some delay, depending on logic. This can
>> probably
>> > > often be achieved with the existing DSL using some combination of
>> > > predicates and intermediate representations, but this involves
>> > contortions
>> > > that feel cumbersome, and probably leads to less intelligible code.
>> I'm
>> > > also not sure the current DSL can model scenarios where the
>> > transformation
>> > > may be one-to-many, as in the last part of Yuto's example, or where
>> the
>> > > emission-delay is data-driven, as in my earlier "sessionization"
>> example.
>> > >
>> > > One idea I'd offer is to provide a mechanism for wiring in Processors
>> > with
>> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1,
>> > O2>,
>> > > etc), and providing each arity with type-safe forwarding interfaces
>> for
>> > > each output stream (eg, Forwarder<T>). This assigns each
>> output-stream a
>> > > clear ordinal, and suggests a corresponding type-safe return-type for
>> the
>> > > DSL (eg, KStreamTuple2<O1, O2>).
>> > >
>> > > I think this pattern could provide a unification of the 'Transformer'
>> and
>> > > 'Processor' APIs.
>> > > This was what I had in mind for a PR we discussed earlier (for
>> modifying
>> > > the Transformer API), but the scope expanded beyond what I felt
>> > comfortable
>> > > submitting without discussion, and I had to prioritize other efforts.
>> > > Regardless, I could get a WIP branch pushed to github later today to
>> > > illustrate if you'd like to see it.
>> > >
>> > > HTH,
>> > > -josh
>> > >
>> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> > >
>> > > > Thanks Yuto for your code snippet. Since you need to access a
>> > customized
>> > > > external storage for metadata, that indeed cannot be wrapped in any
>> > > > built-in operators in the Streams DSL yet, and your code example in
>> the
>> > > > previous email would be close to the best you can do with the
>> > high-level
>> > > > DSL now.
>> > > >
>> > > > One minor improvement from your above code, though, is that instead
>> of
>> > > > calling map(... -> process()) you can actually call transform(),
>> which
>> > > > still allows you to provide a customized transformer function, but
>> it
>> > > still
>> > > > gives you strong typing assuming all these three kinds of records
>> are
>> > of
>> > > > the same key / value types.
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
>> > > kawamuray.dadada@gmail.com
>> > > > >
>> > > > wrote:
>> > > >
>> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
>> > > > > > Hi Yuto,
>> > > > > >
>> > > > > > Is the destination topic embedded as part of the value in the
>> > > original
>> > > > > > "foo" topic? If yes could you just access that field directly
>> > instead
>> > > > of
>> > > > > > mapping to a (key, value, destination) triplet?
>> > > > > >
>> > > > >
>> > > > > Nope. KeyValueWithDestination is just an example of output from
>> the
>> > > > > first Processor and is not included in actual messages that the
>> topic
>> > > > > foo received.
>> > > > > Let me explain bit more realistic use-case. How can we write a
>> > > > > Processor like below in High-level DSL cleanly?
>> > > > >
>> > > > > ```java
>> > > > > public class EventProcessor implements Processor<String, Event> {
>> > > > > ...
>> > > > >   @Override
>> > > > >   public void process(String key, Event value) {
>> > > > >       EventMetadata meta =
>> > > > > getEventMetadataFromExternalStorage(value.getId());
>> > > > >
>> > > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
>> > > > >           // This event is corrupted! let's evacuate it once to
>> the
>> > > > > grave topic for further investigation.
>> > > > >           context.forward(key, value, "CorruptedEventSink");
>> > > > >       }
>> > > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
>> > > > >           // Antoher case of corruption, but maybe recoverable.
>> > > > >           context.forward(key, value,
>> > > "CorruptedEventRecoveryProcessor");
>> > > > >       }
>> > > > >
>> > > > >       for (Foo foo : event.getFoos()) {
>> > > > >           context.forward(key, buildMessage(meta, foo),
>> > > "FooProcessor");
>> > > > >       }
>> > > > >   }
>> > > > > ...
>> > > > > }
>> > > > > ```
>> > > > >
>> > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
>> > > > > kawamuray.dadada@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Hi Guozhang,
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
>> > > > > >> > Hi Yuto,
>> > > > > >> >
>> > > > > >> > That is a good suggestion, the child index is not very
>> intuitive
>> > > > from
>> > > > > >> > programmer's view and we can even consider replacing it with
>> the
>> > > > > >> processor
>> > > > > >> > name instead of overloading it. Could you file a JIRA?
>> > > > > >> >
>> > > > > >>
>> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
>> > > > > >>
>> > > > > >> > Also I am wondering if you have looked at the higher-level
>> > Streams
>> > > > > DSL,
>> > > > > >> and
>> > > > > >> > if yes could let me know what are the limitations from using
>> > that
>> > > > > APIs in
>> > > > > >> > your case?
>> > > > > >> >
>> > > > > >>
>> > > > > >> Well, I read though high-level DSL interface but couldn't find
>> an
>> > > easy
>> > > > > >> way to handle output from Processors which could issue multiple
>> > > > > >> messages to arbitrary different destinations.
>> > > > > >> Maybe it could be done by doing something like below but it
>> > doesn't
>> > > > > >> look good. Please let me know if you have any idea to do this
>> in
>> > > > > >> easier way.
>> > > > > >>
>> > > > > >> ```java
>> > > > > >> class KeyValueWithDestination {
>> > > > > >>     K key;
>> > > > > >>     V value;
>> > > > > >>     String destination;
>> > > > > >> }
>> > > > > >>
>> > > > > >> class DestinationPredicate implements Predicate<K,
>> > > > > >> KeyValueWithDestination> {
>> > > > > >>     String destination;
>> > > > > >>     @Override
>> > > > > >>     public boolean test(K key, KeyValueWithDestination value) {
>> > > > > >>         return value.destination.equals(destination);
>> > > > > >>     }
>> > > > > >> }
>> > > > > >>
>> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
>> > > > > >>
>> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates =
>> > > > > >>
>>  Arrays.stream(destTopics).map(DestinationPredicate::new)
>> > > > > >>                                  .toArray(Predicate<K,
>> > > > > >> KeyValueWithDestination>::new);
>> > > > > >>
>> > > > > >> branches = builder.stream("foo")
>> > > > > >>                   .map((key, value) -> processor.process(key,
>> > value)
>> > > > > >> /* => KeyValueWithDestination */)
>> > > > > >>                   .branch(predicates);
>> > > > > >>
>> > > > > >> for (int i = 0; i < branches.length; i++) {
>> > > > > >>     branches[i].to(destTopics[i]);
>> > > > > >> }
>> > > > > >> ```
>> > > > > >>
>> > > > > >>
>> > > > > >> > Guozhang
>> > > > > >> >
>> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
>> > > > > >> kawamuray.dadada@gmail.com>
>> > > > > >> > wrote:
>> > > > > >> >
>> > > > > >> >> When I tried to implement a task which does kinda
>> dispatching
>> > to
>> > > > > >> >> downstream processors or sinks, looks like relying on
>> > > > > >> >> context.forward(K, V, int childIndex) is the only way now.
>> > > > > >> >> I have a question why this method implemented using
>> > > > childIndex(which
>> > > > > >> >> is just an index of children "List" that based on order of
>> > > > > >> >> builder.addProcessor() call) instead of child name(first
>> > argument
>> > > > to
>> > > > > >> >> add{Processor,Sink}).
>> > > > > >> >> I wanna ask what is the concrete use case of forward(K, V,
>> int
>> > > > > >> >> childIndex) and is it makes sense to introduce another
>> > overload:
>> > > > > >> >> forward(K, V, String childName) for much handy use.
>> > > > > >> >> Currently I have a use-case like this in my mind:
>> > > > > >> >> ```
>> > > > > >> >> builder.addProcessor("DispatchProcess", new
>> > > > > >> >> DispatchProcessorSupplier(), "Source");
>> > > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
>> > > > > >> >> "DispatchProcess");
>> > > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
>> > > > > >> >> "DispatchProcess");
>> > > > > >> >>
>> > > > > >> >> // in process(key, value)
>> > > > > >> >> if ("key-for-A".equals(key)) {
>> > > > > >> >>     context.forward(key, value, "Process-A");
>> > > > > >> >> } else if ("key-for-B".equals(key)) {
>> > > > > >> >>     context.forward(key, value, "Process-B");
>> > > > > >> >> }
>> > > > > >> >> ```
>> > > > > >> >>
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > --
>> > > > > >> > -- Guozhang
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by Guozhang Wang <wa...@gmail.com>.
HI Josh,

Re 1): transform is only for usage in the higher-level DSL, while in the
lower-level APIs people are expected to work with Processor only, which for
now use context.forward() to send record to the downstream processors.

Re 2): I have a few questions for your propose: with different typed
key-value pairs, are they supposed to be forwarding to different children
processors, i.e. you'd better call "forward(K, V, String /* child name*/ )"
rather than "forward(K, V)" which will forward to all the children
processors, one by one. In this case, the topology builder also needs to be
refactored since we need to make sure the children processors exist when
the processors are defined, which could be a bit tricky in implementation.

Also, is this change also aimed at making the lower-level Processor to be
type-safe as well? The main motivation for not supporting strong typed
Processors is mainly for allowing users to flexibly connect processors in
the topology without worrying about data types of each processor. As I
mentioned above, making specific forwards to downstream processors would
likely to defeat this purpose.


Guozhang


On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <jo...@gmail.com> wrote:

> Hi Guozhang,
>
> I'll reply to your points in-line below:
>
> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Josh,
> >
> > I think there are a few issues that we want to resolve here, which could
> be
> > orthogonal to each other.
> >
> > 1) one-to-many mapping in transform() function that generates a single
> > stream (i.e. single typed key-value pairs).
> >
> > Since transform() already enforces to make type-safe return values, one
> > thing we can do is to change the punctuate() function return value from
> > "null" to "R" as well. And then for one-to-many mapping one can then
> define
> > R as Array<MyType>
> >
> > stream.transform().flatMap(/* from Array<MyType> to MyType*/)
> >
> >
> Interesting, it hadn't thought of returning an Iterable from a Transformer
> to achieve the one-to-many case. Regardless, my initial reaction is that
> this seems natural when you're working in the declarative DSL (where we can
> move the flatMap to the left of the one-to-many lambda), but seems
> cumbersome when writing a lower-level imperative Transformer/Processor
> (which then requires a subsequent flatMap to explode the arrays). I think
> I'd prefer a more imperative, side-effecting style of emission in this
> case; see my reply to #2 below.
>
>
> > 2) having a new function that takes one stream, and generate multiple
> > streams with different key-value types.
> >
> > This is a good-to-have operator in the Streams DSL, and I think this is
> > your proposed new API in the previous email? I am not sure I
> > understand the "telescoping"
> > arity completely though, so let me know if I'm wrong.
> >
>
> Yes, sorry, I wrote my previous mail in a hurry this morning. By
> "telescoping", I meant defining processor/transformer interfaces for each
> supported output arity, with the corresponding output-types expressed as
> generics:
>
>    - Processor1<K, V, K1, V1>
>    - Processor2<K, V, K1, V1, K2, V2>
>    - Processor3<K, V, K1, V1, K2, V2, K3, V3>
>
> (Yes, the number of generics here is getting unwieldy, but I don't
> immediately see a good way to avoid that while preserving type-safety. Some
> sort of cosmetic improvement would be nice!)
>
> Given this, the framework could inject type-safe "emitters" into the
> Processors for each output-stream:
>
> interface Processor2<K, V, K1, V1, K2, V2> {
>   void init(ProcessorContext context, Forwarder<K1, V1> output1,
> Forwarder<K2, V2> output2);
>
>   // these can use the forwarders provided to init() to emit any number of
> values
>   void process(I input);
>   void punctuate(long timestamp);
>   // ...
> }
>
> interface Forwarder<K, V> {
>   void forward(K key, V value);
> }
>
> ... then, in KStream<K,V>:
>
> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V,
> K1, V1, K2, V2> processor, String... stateStores);
>
> I haven't worked through all of the details, but I'm optimistic that this
> could work nicely to unify the Transformer and Processor APIs, and address
> all of the described use-cases (up to some arbitrarily-chosen number of
> supported output-streams).
>
>
> > 3) having data-driven emission policy (this will be the building block of
> > session windows) as well as time-drive emission policy.
> >
> > I am thinking about how to support this as well, one thing is that we can
> > use the underlying process() function for data-driven emission, for
> > example, if there is a session-start / end flag then create the
> > corresponding session record in state, and only emit upon session-end
> flag;
> > and the underlying punctuate() function for time-drive emission (we
> > probably need to first refactor it to be triggered by record timestamp
> > instead of wallclock time).
> >
>
> Yes, I agree: data-driven emission could work just fine with process(), and
> delayed emission works nicely with punctuate(). I've also been meaning to
> mention the clear need for event-time punctuation, so I'm glad to hear
> that's on your radar! Watermarking will be important for session-windowing.
>
> Thoughts?
>
> -josh
>
>
> >
> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jo...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > Just chiming in with Yuto: I think the custom Processor becomes
> > attractive
> > > in scenarios where a node in the graph may emit to a variety of
> > downstream
> > > paths, possibly after some delay, depending on logic. This can probably
> > > often be achieved with the existing DSL using some combination of
> > > predicates and intermediate representations, but this involves
> > contortions
> > > that feel cumbersome, and probably leads to less intelligible code. I'm
> > > also not sure the current DSL can model scenarios where the
> > transformation
> > > may be one-to-many, as in the last part of Yuto's example, or where the
> > > emission-delay is data-driven, as in my earlier "sessionization"
> example.
> > >
> > > One idea I'd offer is to provide a mechanism for wiring in Processors
> > with
> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1,
> > O2>,
> > > etc), and providing each arity with type-safe forwarding interfaces for
> > > each output stream (eg, Forwarder<T>). This assigns each output-stream
> a
> > > clear ordinal, and suggests a corresponding type-safe return-type for
> the
> > > DSL (eg, KStreamTuple2<O1, O2>).
> > >
> > > I think this pattern could provide a unification of the 'Transformer'
> and
> > > 'Processor' APIs.
> > > This was what I had in mind for a PR we discussed earlier (for
> modifying
> > > the Transformer API), but the scope expanded beyond what I felt
> > comfortable
> > > submitting without discussion, and I had to prioritize other efforts.
> > > Regardless, I could get a WIP branch pushed to github later today to
> > > illustrate if you'd like to see it.
> > >
> > > HTH,
> > > -josh
> > >
> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wa...@gmail.com> wrote:
> > >
> > > > Thanks Yuto for your code snippet. Since you need to access a
> > customized
> > > > external storage for metadata, that indeed cannot be wrapped in any
> > > > built-in operators in the Streams DSL yet, and your code example in
> the
> > > > previous email would be close to the best you can do with the
> > high-level
> > > > DSL now.
> > > >
> > > > One minor improvement from your above code, though, is that instead
> of
> > > > calling map(... -> process()) you can actually call transform(),
> which
> > > > still allows you to provide a customized transformer function, but it
> > > still
> > > > gives you strong typing assuming all these three kinds of records are
> > of
> > > > the same key / value types.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> > > kawamuray.dadada@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > > > > Hi Yuto,
> > > > > >
> > > > > > Is the destination topic embedded as part of the value in the
> > > original
> > > > > > "foo" topic? If yes could you just access that field directly
> > instead
> > > > of
> > > > > > mapping to a (key, value, destination) triplet?
> > > > > >
> > > > >
> > > > > Nope. KeyValueWithDestination is just an example of output from the
> > > > > first Processor and is not included in actual messages that the
> topic
> > > > > foo received.
> > > > > Let me explain bit more realistic use-case. How can we write a
> > > > > Processor like below in High-level DSL cleanly?
> > > > >
> > > > > ```java
> > > > > public class EventProcessor implements Processor<String, Event> {
> > > > > ...
> > > > >   @Override
> > > > >   public void process(String key, Event value) {
> > > > >       EventMetadata meta =
> > > > > getEventMetadataFromExternalStorage(value.getId());
> > > > >
> > > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> > > > >           // This event is corrupted! let's evacuate it once to the
> > > > > grave topic for further investigation.
> > > > >           context.forward(key, value, "CorruptedEventSink");
> > > > >       }
> > > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> > > > >           // Antoher case of corruption, but maybe recoverable.
> > > > >           context.forward(key, value,
> > > "CorruptedEventRecoveryProcessor");
> > > > >       }
> > > > >
> > > > >       for (Foo foo : event.getFoos()) {
> > > > >           context.forward(key, buildMessage(meta, foo),
> > > "FooProcessor");
> > > > >       }
> > > > >   }
> > > > > ...
> > > > > }
> > > > > ```
> > > > >
> > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > > > > kawamuray.dadada@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Guozhang,
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > > > >> > Hi Yuto,
> > > > > >> >
> > > > > >> > That is a good suggestion, the child index is not very
> intuitive
> > > > from
> > > > > >> > programmer's view and we can even consider replacing it with
> the
> > > > > >> processor
> > > > > >> > name instead of overloading it. Could you file a JIRA?
> > > > > >> >
> > > > > >>
> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > > > > >>
> > > > > >> > Also I am wondering if you have looked at the higher-level
> > Streams
> > > > > DSL,
> > > > > >> and
> > > > > >> > if yes could let me know what are the limitations from using
> > that
> > > > > APIs in
> > > > > >> > your case?
> > > > > >> >
> > > > > >>
> > > > > >> Well, I read though high-level DSL interface but couldn't find
> an
> > > easy
> > > > > >> way to handle output from Processors which could issue multiple
> > > > > >> messages to arbitrary different destinations.
> > > > > >> Maybe it could be done by doing something like below but it
> > doesn't
> > > > > >> look good. Please let me know if you have any idea to do this in
> > > > > >> easier way.
> > > > > >>
> > > > > >> ```java
> > > > > >> class KeyValueWithDestination {
> > > > > >>     K key;
> > > > > >>     V value;
> > > > > >>     String destination;
> > > > > >> }
> > > > > >>
> > > > > >> class DestinationPredicate implements Predicate<K,
> > > > > >> KeyValueWithDestination> {
> > > > > >>     String destination;
> > > > > >>     @Override
> > > > > >>     public boolean test(K key, KeyValueWithDestination value) {
> > > > > >>         return value.destination.equals(destination);
> > > > > >>     }
> > > > > >> }
> > > > > >>
> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > > > > >>
> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > > > > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > > > > >>                                  .toArray(Predicate<K,
> > > > > >> KeyValueWithDestination>::new);
> > > > > >>
> > > > > >> branches = builder.stream("foo")
> > > > > >>                   .map((key, value) -> processor.process(key,
> > value)
> > > > > >> /* => KeyValueWithDestination */)
> > > > > >>                   .branch(predicates);
> > > > > >>
> > > > > >> for (int i = 0; i < branches.length; i++) {
> > > > > >>     branches[i].to(destTopics[i]);
> > > > > >> }
> > > > > >> ```
> > > > > >>
> > > > > >>
> > > > > >> > Guozhang
> > > > > >> >
> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > > > > >> kawamuray.dadada@gmail.com>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> >> When I tried to implement a task which does kinda dispatching
> > to
> > > > > >> >> downstream processors or sinks, looks like relying on
> > > > > >> >> context.forward(K, V, int childIndex) is the only way now.
> > > > > >> >> I have a question why this method implemented using
> > > > childIndex(which
> > > > > >> >> is just an index of children "List" that based on order of
> > > > > >> >> builder.addProcessor() call) instead of child name(first
> > argument
> > > > to
> > > > > >> >> add{Processor,Sink}).
> > > > > >> >> I wanna ask what is the concrete use case of forward(K, V,
> int
> > > > > >> >> childIndex) and is it makes sense to introduce another
> > overload:
> > > > > >> >> forward(K, V, String childName) for much handy use.
> > > > > >> >> Currently I have a use-case like this in my mind:
> > > > > >> >> ```
> > > > > >> >> builder.addProcessor("DispatchProcess", new
> > > > > >> >> DispatchProcessorSupplier(), "Source");
> > > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > > > > >> >> "DispatchProcess");
> > > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > > > > >> >> "DispatchProcess");
> > > > > >> >>
> > > > > >> >> // in process(key, value)
> > > > > >> >> if ("key-for-A".equals(key)) {
> > > > > >> >>     context.forward(key, value, "Process-A");
> > > > > >> >> } else if ("key-for-B".equals(key)) {
> > > > > >> >>     context.forward(key, value, "Process-B");
> > > > > >> >> }
> > > > > >> >> ```
> > > > > >> >>
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by josh gruenberg <jo...@gmail.com>.
Hi Guozhang,

I'll reply to your points in-line below:

On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hi Josh,
>
> I think there are a few issues that we want to resolve here, which could be
> orthogonal to each other.
>
> 1) one-to-many mapping in transform() function that generates a single
> stream (i.e. single typed key-value pairs).
>
> Since transform() already enforces to make type-safe return values, one
> thing we can do is to change the punctuate() function return value from
> "null" to "R" as well. And then for one-to-many mapping one can then define
> R as Array<MyType>
>
> stream.transform().flatMap(/* from Array<MyType> to MyType*/)
>
>
Interesting, it hadn't thought of returning an Iterable from a Transformer
to achieve the one-to-many case. Regardless, my initial reaction is that
this seems natural when you're working in the declarative DSL (where we can
move the flatMap to the left of the one-to-many lambda), but seems
cumbersome when writing a lower-level imperative Transformer/Processor
(which then requires a subsequent flatMap to explode the arrays). I think
I'd prefer a more imperative, side-effecting style of emission in this
case; see my reply to #2 below.


> 2) having a new function that takes one stream, and generate multiple
> streams with different key-value types.
>
> This is a good-to-have operator in the Streams DSL, and I think this is
> your proposed new API in the previous email? I am not sure I
> understand the "telescoping"
> arity completely though, so let me know if I'm wrong.
>

Yes, sorry, I wrote my previous mail in a hurry this morning. By
"telescoping", I meant defining processor/transformer interfaces for each
supported output arity, with the corresponding output-types expressed as
generics:

   - Processor1<K, V, K1, V1>
   - Processor2<K, V, K1, V1, K2, V2>
   - Processor3<K, V, K1, V1, K2, V2, K3, V3>

(Yes, the number of generics here is getting unwieldy, but I don't
immediately see a good way to avoid that while preserving type-safety. Some
sort of cosmetic improvement would be nice!)

Given this, the framework could inject type-safe "emitters" into the
Processors for each output-stream:

interface Processor2<K, V, K1, V1, K2, V2> {
  void init(ProcessorContext context, Forwarder<K1, V1> output1,
Forwarder<K2, V2> output2);

  // these can use the forwarders provided to init() to emit any number of
values
  void process(I input);
  void punctuate(long timestamp);
  // ...
}

interface Forwarder<K, V> {
  void forward(K key, V value);
}

... then, in KStream<K,V>:

<K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V,
K1, V1, K2, V2> processor, String... stateStores);

I haven't worked through all of the details, but I'm optimistic that this
could work nicely to unify the Transformer and Processor APIs, and address
all of the described use-cases (up to some arbitrarily-chosen number of
supported output-streams).


> 3) having data-driven emission policy (this will be the building block of
> session windows) as well as time-drive emission policy.
>
> I am thinking about how to support this as well, one thing is that we can
> use the underlying process() function for data-driven emission, for
> example, if there is a session-start / end flag then create the
> corresponding session record in state, and only emit upon session-end flag;
> and the underlying punctuate() function for time-drive emission (we
> probably need to first refactor it to be triggered by record timestamp
> instead of wallclock time).
>

Yes, I agree: data-driven emission could work just fine with process(), and
delayed emission works nicely with punctuate(). I've also been meaning to
mention the clear need for event-time punctuation, so I'm glad to hear
that's on your radar! Watermarking will be important for session-windowing.

Thoughts?

-josh


>
> On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jo...@gmail.com> wrote:
>
> > Hi all,
> >
> > Just chiming in with Yuto: I think the custom Processor becomes
> attractive
> > in scenarios where a node in the graph may emit to a variety of
> downstream
> > paths, possibly after some delay, depending on logic. This can probably
> > often be achieved with the existing DSL using some combination of
> > predicates and intermediate representations, but this involves
> contortions
> > that feel cumbersome, and probably leads to less intelligible code. I'm
> > also not sure the current DSL can model scenarios where the
> transformation
> > may be one-to-many, as in the last part of Yuto's example, or where the
> > emission-delay is data-driven, as in my earlier "sessionization" example.
> >
> > One idea I'd offer is to provide a mechanism for wiring in Processors
> with
> > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1,
> O2>,
> > etc), and providing each arity with type-safe forwarding interfaces for
> > each output stream (eg, Forwarder<T>). This assigns each output-stream a
> > clear ordinal, and suggests a corresponding type-safe return-type for the
> > DSL (eg, KStreamTuple2<O1, O2>).
> >
> > I think this pattern could provide a unification of the 'Transformer' and
> > 'Processor' APIs.
> > This was what I had in mind for a PR we discussed earlier (for modifying
> > the Transformer API), but the scope expanded beyond what I felt
> comfortable
> > submitting without discussion, and I had to prioritize other efforts.
> > Regardless, I could get a WIP branch pushed to github later today to
> > illustrate if you'd like to see it.
> >
> > HTH,
> > -josh
> >
> > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > Thanks Yuto for your code snippet. Since you need to access a
> customized
> > > external storage for metadata, that indeed cannot be wrapped in any
> > > built-in operators in the Streams DSL yet, and your code example in the
> > > previous email would be close to the best you can do with the
> high-level
> > > DSL now.
> > >
> > > One minor improvement from your above code, though, is that instead of
> > > calling map(... -> process()) you can actually call transform(), which
> > > still allows you to provide a customized transformer function, but it
> > still
> > > gives you strong typing assuming all these three kinds of records are
> of
> > > the same key / value types.
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> > kawamuray.dadada@gmail.com
> > > >
> > > wrote:
> > >
> > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > > > Hi Yuto,
> > > > >
> > > > > Is the destination topic embedded as part of the value in the
> > original
> > > > > "foo" topic? If yes could you just access that field directly
> instead
> > > of
> > > > > mapping to a (key, value, destination) triplet?
> > > > >
> > > >
> > > > Nope. KeyValueWithDestination is just an example of output from the
> > > > first Processor and is not included in actual messages that the topic
> > > > foo received.
> > > > Let me explain bit more realistic use-case. How can we write a
> > > > Processor like below in High-level DSL cleanly?
> > > >
> > > > ```java
> > > > public class EventProcessor implements Processor<String, Event> {
> > > > ...
> > > >   @Override
> > > >   public void process(String key, Event value) {
> > > >       EventMetadata meta =
> > > > getEventMetadataFromExternalStorage(value.getId());
> > > >
> > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> > > >           // This event is corrupted! let's evacuate it once to the
> > > > grave topic for further investigation.
> > > >           context.forward(key, value, "CorruptedEventSink");
> > > >       }
> > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> > > >           // Antoher case of corruption, but maybe recoverable.
> > > >           context.forward(key, value,
> > "CorruptedEventRecoveryProcessor");
> > > >       }
> > > >
> > > >       for (Foo foo : event.getFoos()) {
> > > >           context.forward(key, buildMessage(meta, foo),
> > "FooProcessor");
> > > >       }
> > > >   }
> > > > ...
> > > > }
> > > > ```
> > > >
> > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > > > kawamuray.dadada@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Guozhang,
> > > > >>
> > > > >>
> > > > >>
> > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > > >> > Hi Yuto,
> > > > >> >
> > > > >> > That is a good suggestion, the child index is not very intuitive
> > > from
> > > > >> > programmer's view and we can even consider replacing it with the
> > > > >> processor
> > > > >> > name instead of overloading it. Could you file a JIRA?
> > > > >> >
> > > > >>
> > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > > > >>
> > > > >> > Also I am wondering if you have looked at the higher-level
> Streams
> > > > DSL,
> > > > >> and
> > > > >> > if yes could let me know what are the limitations from using
> that
> > > > APIs in
> > > > >> > your case?
> > > > >> >
> > > > >>
> > > > >> Well, I read though high-level DSL interface but couldn't find an
> > easy
> > > > >> way to handle output from Processors which could issue multiple
> > > > >> messages to arbitrary different destinations.
> > > > >> Maybe it could be done by doing something like below but it
> doesn't
> > > > >> look good. Please let me know if you have any idea to do this in
> > > > >> easier way.
> > > > >>
> > > > >> ```java
> > > > >> class KeyValueWithDestination {
> > > > >>     K key;
> > > > >>     V value;
> > > > >>     String destination;
> > > > >> }
> > > > >>
> > > > >> class DestinationPredicate implements Predicate<K,
> > > > >> KeyValueWithDestination> {
> > > > >>     String destination;
> > > > >>     @Override
> > > > >>     public boolean test(K key, KeyValueWithDestination value) {
> > > > >>         return value.destination.equals(destination);
> > > > >>     }
> > > > >> }
> > > > >>
> > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > > > >>
> > > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > > > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > > > >>                                  .toArray(Predicate<K,
> > > > >> KeyValueWithDestination>::new);
> > > > >>
> > > > >> branches = builder.stream("foo")
> > > > >>                   .map((key, value) -> processor.process(key,
> value)
> > > > >> /* => KeyValueWithDestination */)
> > > > >>                   .branch(predicates);
> > > > >>
> > > > >> for (int i = 0; i < branches.length; i++) {
> > > > >>     branches[i].to(destTopics[i]);
> > > > >> }
> > > > >> ```
> > > > >>
> > > > >>
> > > > >> > Guozhang
> > > > >> >
> > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > > > >> kawamuray.dadada@gmail.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> When I tried to implement a task which does kinda dispatching
> to
> > > > >> >> downstream processors or sinks, looks like relying on
> > > > >> >> context.forward(K, V, int childIndex) is the only way now.
> > > > >> >> I have a question why this method implemented using
> > > childIndex(which
> > > > >> >> is just an index of children "List" that based on order of
> > > > >> >> builder.addProcessor() call) instead of child name(first
> argument
> > > to
> > > > >> >> add{Processor,Sink}).
> > > > >> >> I wanna ask what is the concrete use case of forward(K, V, int
> > > > >> >> childIndex) and is it makes sense to introduce another
> overload:
> > > > >> >> forward(K, V, String childName) for much handy use.
> > > > >> >> Currently I have a use-case like this in my mind:
> > > > >> >> ```
> > > > >> >> builder.addProcessor("DispatchProcess", new
> > > > >> >> DispatchProcessorSupplier(), "Source");
> > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > > > >> >> "DispatchProcess");
> > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > > > >> >> "DispatchProcess");
> > > > >> >>
> > > > >> >> // in process(key, value)
> > > > >> >> if ("key-for-A".equals(key)) {
> > > > >> >>     context.forward(key, value, "Process-A");
> > > > >> >> } else if ("key-for-B".equals(key)) {
> > > > >> >>     context.forward(key, value, "Process-B");
> > > > >> >> }
> > > > >> >> ```
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams: context.forward() with downstream name

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Josh,

I think there are a few issues that we want to resolve here, which could be
orthogonal to each other.

1) one-to-many mapping in transform() function that generates a single
stream (i.e. single typed key-value pairs).

Since transform() already enforces to make type-safe return values, one
thing we can do is to change the punctuate() function return value from
"null" to "R" as well. And then for one-to-many mapping one can then define
R as Array<MyType>

stream.transform().flatMap(/* from Array<MyType> to MyType*/)

2) having a new function that takes one stream, and generate multiple
streams with different key-value types.

This is a good-to-have operator in the Streams DSL, and I think this is
your proposed new API in the previous email? I am not sure I
understand the "telescoping"
arity completely though, so let me know if I'm wrong.

3) having data-driven emission policy (this will be the building block of
session windows) as well as time-drive emission policy.

I am thinking about how to support this as well, one thing is that we can
use the underlying process() function for data-driven emission, for
example, if there is a session-start / end flag then create the
corresponding session record in state, and only emit upon session-end flag;
and the underlying punctuate() function for time-drive emission (we
probably need to first refactor it to be triggered by record timestamp
instead of wallclock time).


Guozhang





On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jo...@gmail.com> wrote:

> Hi all,
>
> Just chiming in with Yuto: I think the custom Processor becomes attractive
> in scenarios where a node in the graph may emit to a variety of downstream
> paths, possibly after some delay, depending on logic. This can probably
> often be achieved with the existing DSL using some combination of
> predicates and intermediate representations, but this involves contortions
> that feel cumbersome, and probably leads to less intelligible code. I'm
> also not sure the current DSL can model scenarios where the transformation
> may be one-to-many, as in the last part of Yuto's example, or where the
> emission-delay is data-driven, as in my earlier "sessionization" example.
>
> One idea I'd offer is to provide a mechanism for wiring in Processors with
> "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, O2>,
> etc), and providing each arity with type-safe forwarding interfaces for
> each output stream (eg, Forwarder<T>). This assigns each output-stream a
> clear ordinal, and suggests a corresponding type-safe return-type for the
> DSL (eg, KStreamTuple2<O1, O2>).
>
> I think this pattern could provide a unification of the 'Transformer' and
> 'Processor' APIs.
> This was what I had in mind for a PR we discussed earlier (for modifying
> the Transformer API), but the scope expanded beyond what I felt comfortable
> submitting without discussion, and I had to prioritize other efforts.
> Regardless, I could get a WIP branch pushed to github later today to
> illustrate if you'd like to see it.
>
> HTH,
> -josh
>
> On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Thanks Yuto for your code snippet. Since you need to access a customized
> > external storage for metadata, that indeed cannot be wrapped in any
> > built-in operators in the Streams DSL yet, and your code example in the
> > previous email would be close to the best you can do with the high-level
> > DSL now.
> >
> > One minor improvement from your above code, though, is that instead of
> > calling map(... -> process()) you can actually call transform(), which
> > still allows you to provide a customized transformer function, but it
> still
> > gives you strong typing assuming all these three kinds of records are of
> > the same key / value types.
> >
> > Guozhang
> >
> >
> >
> >
> > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> kawamuray.dadada@gmail.com
> > >
> > wrote:
> >
> > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > > Hi Yuto,
> > > >
> > > > Is the destination topic embedded as part of the value in the
> original
> > > > "foo" topic? If yes could you just access that field directly instead
> > of
> > > > mapping to a (key, value, destination) triplet?
> > > >
> > >
> > > Nope. KeyValueWithDestination is just an example of output from the
> > > first Processor and is not included in actual messages that the topic
> > > foo received.
> > > Let me explain bit more realistic use-case. How can we write a
> > > Processor like below in High-level DSL cleanly?
> > >
> > > ```java
> > > public class EventProcessor implements Processor<String, Event> {
> > > ...
> > >   @Override
> > >   public void process(String key, Event value) {
> > >       EventMetadata meta =
> > > getEventMetadataFromExternalStorage(value.getId());
> > >
> > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> > >           // This event is corrupted! let's evacuate it once to the
> > > grave topic for further investigation.
> > >           context.forward(key, value, "CorruptedEventSink");
> > >       }
> > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> > >           // Antoher case of corruption, but maybe recoverable.
> > >           context.forward(key, value,
> "CorruptedEventRecoveryProcessor");
> > >       }
> > >
> > >       for (Foo foo : event.getFoos()) {
> > >           context.forward(key, buildMessage(meta, foo),
> "FooProcessor");
> > >       }
> > >   }
> > > ...
> > > }
> > > ```
> > >
> > >
> > > > Guozhang
> > > >
> > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > > kawamuray.dadada@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Guozhang,
> > > >>
> > > >>
> > > >>
> > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > >> > Hi Yuto,
> > > >> >
> > > >> > That is a good suggestion, the child index is not very intuitive
> > from
> > > >> > programmer's view and we can even consider replacing it with the
> > > >> processor
> > > >> > name instead of overloading it. Could you file a JIRA?
> > > >> >
> > > >>
> > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > > >>
> > > >> > Also I am wondering if you have looked at the higher-level Streams
> > > DSL,
> > > >> and
> > > >> > if yes could let me know what are the limitations from using that
> > > APIs in
> > > >> > your case?
> > > >> >
> > > >>
> > > >> Well, I read though high-level DSL interface but couldn't find an
> easy
> > > >> way to handle output from Processors which could issue multiple
> > > >> messages to arbitrary different destinations.
> > > >> Maybe it could be done by doing something like below but it doesn't
> > > >> look good. Please let me know if you have any idea to do this in
> > > >> easier way.
> > > >>
> > > >> ```java
> > > >> class KeyValueWithDestination {
> > > >>     K key;
> > > >>     V value;
> > > >>     String destination;
> > > >> }
> > > >>
> > > >> class DestinationPredicate implements Predicate<K,
> > > >> KeyValueWithDestination> {
> > > >>     String destination;
> > > >>     @Override
> > > >>     public boolean test(K key, KeyValueWithDestination value) {
> > > >>         return value.destination.equals(destination);
> > > >>     }
> > > >> }
> > > >>
> > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > > >>
> > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > > >>                                  .toArray(Predicate<K,
> > > >> KeyValueWithDestination>::new);
> > > >>
> > > >> branches = builder.stream("foo")
> > > >>                   .map((key, value) -> processor.process(key, value)
> > > >> /* => KeyValueWithDestination */)
> > > >>                   .branch(predicates);
> > > >>
> > > >> for (int i = 0; i < branches.length; i++) {
> > > >>     branches[i].to(destTopics[i]);
> > > >> }
> > > >> ```
> > > >>
> > > >>
> > > >> > Guozhang
> > > >> >
> > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > > >> kawamuray.dadada@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> >> When I tried to implement a task which does kinda dispatching to
> > > >> >> downstream processors or sinks, looks like relying on
> > > >> >> context.forward(K, V, int childIndex) is the only way now.
> > > >> >> I have a question why this method implemented using
> > childIndex(which
> > > >> >> is just an index of children "List" that based on order of
> > > >> >> builder.addProcessor() call) instead of child name(first argument
> > to
> > > >> >> add{Processor,Sink}).
> > > >> >> I wanna ask what is the concrete use case of forward(K, V, int
> > > >> >> childIndex) and is it makes sense to introduce another overload:
> > > >> >> forward(K, V, String childName) for much handy use.
> > > >> >> Currently I have a use-case like this in my mind:
> > > >> >> ```
> > > >> >> builder.addProcessor("DispatchProcess", new
> > > >> >> DispatchProcessorSupplier(), "Source");
> > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > > >> >> "DispatchProcess");
> > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > > >> >> "DispatchProcess");
> > > >> >>
> > > >> >> // in process(key, value)
> > > >> >> if ("key-for-A".equals(key)) {
> > > >> >>     context.forward(key, value, "Process-A");
> > > >> >> } else if ("key-for-B".equals(key)) {
> > > >> >>     context.forward(key, value, "Process-B");
> > > >> >> }
> > > >> >> ```
> > > >> >>
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by josh gruenberg <jo...@gmail.com>.
Hi all,

Just chiming in with Yuto: I think the custom Processor becomes attractive
in scenarios where a node in the graph may emit to a variety of downstream
paths, possibly after some delay, depending on logic. This can probably
often be achieved with the existing DSL using some combination of
predicates and intermediate representations, but this involves contortions
that feel cumbersome, and probably leads to less intelligible code. I'm
also not sure the current DSL can model scenarios where the transformation
may be one-to-many, as in the last part of Yuto's example, or where the
emission-delay is data-driven, as in my earlier "sessionization" example.

One idea I'd offer is to provide a mechanism for wiring in Processors with
"telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, O2>,
etc), and providing each arity with type-safe forwarding interfaces for
each output stream (eg, Forwarder<T>). This assigns each output-stream a
clear ordinal, and suggests a corresponding type-safe return-type for the
DSL (eg, KStreamTuple2<O1, O2>).

I think this pattern could provide a unification of the 'Transformer' and
'Processor' APIs.
This was what I had in mind for a PR we discussed earlier (for modifying
the Transformer API), but the scope expanded beyond what I felt comfortable
submitting without discussion, and I had to prioritize other efforts.
Regardless, I could get a WIP branch pushed to github later today to
illustrate if you'd like to see it.

HTH,
-josh

On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wa...@gmail.com> wrote:

> Thanks Yuto for your code snippet. Since you need to access a customized
> external storage for metadata, that indeed cannot be wrapped in any
> built-in operators in the Streams DSL yet, and your code example in the
> previous email would be close to the best you can do with the high-level
> DSL now.
>
> One minor improvement from your above code, though, is that instead of
> calling map(... -> process()) you can actually call transform(), which
> still allows you to provide a customized transformer function, but it still
> gives you strong typing assuming all these three kinds of records are of
> the same key / value types.
>
> Guozhang
>
>
>
>
> On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <kawamuray.dadada@gmail.com
> >
> wrote:
>
> > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > > Hi Yuto,
> > >
> > > Is the destination topic embedded as part of the value in the original
> > > "foo" topic? If yes could you just access that field directly instead
> of
> > > mapping to a (key, value, destination) triplet?
> > >
> >
> > Nope. KeyValueWithDestination is just an example of output from the
> > first Processor and is not included in actual messages that the topic
> > foo received.
> > Let me explain bit more realistic use-case. How can we write a
> > Processor like below in High-level DSL cleanly?
> >
> > ```java
> > public class EventProcessor implements Processor<String, Event> {
> > ...
> >   @Override
> >   public void process(String key, Event value) {
> >       EventMetadata meta =
> > getEventMetadataFromExternalStorage(value.getId());
> >
> >       if (isFieldACorrupted(meta, value.getFieldA())) {
> >           // This event is corrupted! let's evacuate it once to the
> > grave topic for further investigation.
> >           context.forward(key, value, "CorruptedEventSink");
> >       }
> >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> >           // Antoher case of corruption, but maybe recoverable.
> >           context.forward(key, value, "CorruptedEventRecoveryProcessor");
> >       }
> >
> >       for (Foo foo : event.getFoos()) {
> >           context.forward(key, buildMessage(meta, foo), "FooProcessor");
> >       }
> >   }
> > ...
> > }
> > ```
> >
> >
> > > Guozhang
> > >
> > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > kawamuray.dadada@gmail.com>
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >>
> > >>
> > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > >> > Hi Yuto,
> > >> >
> > >> > That is a good suggestion, the child index is not very intuitive
> from
> > >> > programmer's view and we can even consider replacing it with the
> > >> processor
> > >> > name instead of overloading it. Could you file a JIRA?
> > >> >
> > >>
> > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > >>
> > >> > Also I am wondering if you have looked at the higher-level Streams
> > DSL,
> > >> and
> > >> > if yes could let me know what are the limitations from using that
> > APIs in
> > >> > your case?
> > >> >
> > >>
> > >> Well, I read though high-level DSL interface but couldn't find an easy
> > >> way to handle output from Processors which could issue multiple
> > >> messages to arbitrary different destinations.
> > >> Maybe it could be done by doing something like below but it doesn't
> > >> look good. Please let me know if you have any idea to do this in
> > >> easier way.
> > >>
> > >> ```java
> > >> class KeyValueWithDestination {
> > >>     K key;
> > >>     V value;
> > >>     String destination;
> > >> }
> > >>
> > >> class DestinationPredicate implements Predicate<K,
> > >> KeyValueWithDestination> {
> > >>     String destination;
> > >>     @Override
> > >>     public boolean test(K key, KeyValueWithDestination value) {
> > >>         return value.destination.equals(destination);
> > >>     }
> > >> }
> > >>
> > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > >>
> > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > >>                                  .toArray(Predicate<K,
> > >> KeyValueWithDestination>::new);
> > >>
> > >> branches = builder.stream("foo")
> > >>                   .map((key, value) -> processor.process(key, value)
> > >> /* => KeyValueWithDestination */)
> > >>                   .branch(predicates);
> > >>
> > >> for (int i = 0; i < branches.length; i++) {
> > >>     branches[i].to(destTopics[i]);
> > >> }
> > >> ```
> > >>
> > >>
> > >> > Guozhang
> > >> >
> > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > >> kawamuray.dadada@gmail.com>
> > >> > wrote:
> > >> >
> > >> >> When I tried to implement a task which does kinda dispatching to
> > >> >> downstream processors or sinks, looks like relying on
> > >> >> context.forward(K, V, int childIndex) is the only way now.
> > >> >> I have a question why this method implemented using
> childIndex(which
> > >> >> is just an index of children "List" that based on order of
> > >> >> builder.addProcessor() call) instead of child name(first argument
> to
> > >> >> add{Processor,Sink}).
> > >> >> I wanna ask what is the concrete use case of forward(K, V, int
> > >> >> childIndex) and is it makes sense to introduce another overload:
> > >> >> forward(K, V, String childName) for much handy use.
> > >> >> Currently I have a use-case like this in my mind:
> > >> >> ```
> > >> >> builder.addProcessor("DispatchProcess", new
> > >> >> DispatchProcessorSupplier(), "Source");
> > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > >> >> "DispatchProcess");
> > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > >> >> "DispatchProcess");
> > >> >>
> > >> >> // in process(key, value)
> > >> >> if ("key-for-A".equals(key)) {
> > >> >>     context.forward(key, value, "Process-A");
> > >> >> } else if ("key-for-B".equals(key)) {
> > >> >>     context.forward(key, value, "Process-B");
> > >> >> }
> > >> >> ```
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams: context.forward() with downstream name

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Yuto for your code snippet. Since you need to access a customized
external storage for metadata, that indeed cannot be wrapped in any
built-in operators in the Streams DSL yet, and your code example in the
previous email would be close to the best you can do with the high-level
DSL now.

One minor improvement from your above code, though, is that instead of
calling map(... -> process()) you can actually call transform(), which
still allows you to provide a customized transformer function, but it still
gives you strong typing assuming all these three kinds of records are of
the same key / value types.

Guozhang




On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <ka...@gmail.com>
wrote:

> 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > Hi Yuto,
> >
> > Is the destination topic embedded as part of the value in the original
> > "foo" topic? If yes could you just access that field directly instead of
> > mapping to a (key, value, destination) triplet?
> >
>
> Nope. KeyValueWithDestination is just an example of output from the
> first Processor and is not included in actual messages that the topic
> foo received.
> Let me explain bit more realistic use-case. How can we write a
> Processor like below in High-level DSL cleanly?
>
> ```java
> public class EventProcessor implements Processor<String, Event> {
> ...
>   @Override
>   public void process(String key, Event value) {
>       EventMetadata meta =
> getEventMetadataFromExternalStorage(value.getId());
>
>       if (isFieldACorrupted(meta, value.getFieldA())) {
>           // This event is corrupted! let's evacuate it once to the
> grave topic for further investigation.
>           context.forward(key, value, "CorruptedEventSink");
>       }
>       if (isFieldBCorrupted(meta, value.getFieldB())) {
>           // Antoher case of corruption, but maybe recoverable.
>           context.forward(key, value, "CorruptedEventRecoveryProcessor");
>       }
>
>       for (Foo foo : event.getFoos()) {
>           context.forward(key, buildMessage(meta, foo), "FooProcessor");
>       }
>   }
> ...
> }
> ```
>
>
> > Guozhang
> >
> > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> kawamuray.dadada@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >>
> >>
> >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> >> > Hi Yuto,
> >> >
> >> > That is a good suggestion, the child index is not very intuitive from
> >> > programmer's view and we can even consider replacing it with the
> >> processor
> >> > name instead of overloading it. Could you file a JIRA?
> >> >
> >>
> >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> >>
> >> > Also I am wondering if you have looked at the higher-level Streams
> DSL,
> >> and
> >> > if yes could let me know what are the limitations from using that
> APIs in
> >> > your case?
> >> >
> >>
> >> Well, I read though high-level DSL interface but couldn't find an easy
> >> way to handle output from Processors which could issue multiple
> >> messages to arbitrary different destinations.
> >> Maybe it could be done by doing something like below but it doesn't
> >> look good. Please let me know if you have any idea to do this in
> >> easier way.
> >>
> >> ```java
> >> class KeyValueWithDestination {
> >>     K key;
> >>     V value;
> >>     String destination;
> >> }
> >>
> >> class DestinationPredicate implements Predicate<K,
> >> KeyValueWithDestination> {
> >>     String destination;
> >>     @Override
> >>     public boolean test(K key, KeyValueWithDestination value) {
> >>         return value.destination.equals(destination);
> >>     }
> >> }
> >>
> >> String[] destTopics = {"topicA", "topicB", "topicC"};
> >>
> >> Predicate<K, KeyValueWithDestination>[] predicates =
> >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> >>                                  .toArray(Predicate<K,
> >> KeyValueWithDestination>::new);
> >>
> >> branches = builder.stream("foo")
> >>                   .map((key, value) -> processor.process(key, value)
> >> /* => KeyValueWithDestination */)
> >>                   .branch(predicates);
> >>
> >> for (int i = 0; i < branches.length; i++) {
> >>     branches[i].to(destTopics[i]);
> >> }
> >> ```
> >>
> >>
> >> > Guozhang
> >> >
> >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> >> kawamuray.dadada@gmail.com>
> >> > wrote:
> >> >
> >> >> When I tried to implement a task which does kinda dispatching to
> >> >> downstream processors or sinks, looks like relying on
> >> >> context.forward(K, V, int childIndex) is the only way now.
> >> >> I have a question why this method implemented using childIndex(which
> >> >> is just an index of children "List" that based on order of
> >> >> builder.addProcessor() call) instead of child name(first argument to
> >> >> add{Processor,Sink}).
> >> >> I wanna ask what is the concrete use case of forward(K, V, int
> >> >> childIndex) and is it makes sense to introduce another overload:
> >> >> forward(K, V, String childName) for much handy use.
> >> >> Currently I have a use-case like this in my mind:
> >> >> ```
> >> >> builder.addProcessor("DispatchProcess", new
> >> >> DispatchProcessorSupplier(), "Source");
> >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> >> >> "DispatchProcess");
> >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> >> >> "DispatchProcess");
> >> >>
> >> >> // in process(key, value)
> >> >> if ("key-for-A".equals(key)) {
> >> >>     context.forward(key, value, "Process-A");
> >> >> } else if ("key-for-B".equals(key)) {
> >> >>     context.forward(key, value, "Process-B");
> >> >> }
> >> >> ```
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by Yuto KAWAMURA <ka...@gmail.com>.
2016-04-04 7:20 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> Hi Yuto,
>
> Is the destination topic embedded as part of the value in the original
> "foo" topic? If yes could you just access that field directly instead of
> mapping to a (key, value, destination) triplet?
>

Nope. KeyValueWithDestination is just an example of output from the
first Processor and is not included in actual messages that the topic
foo received.
Let me explain bit more realistic use-case. How can we write a
Processor like below in High-level DSL cleanly?

```java
public class EventProcessor implements Processor<String, Event> {
...
  @Override
  public void process(String key, Event value) {
      EventMetadata meta = getEventMetadataFromExternalStorage(value.getId());

      if (isFieldACorrupted(meta, value.getFieldA())) {
          // This event is corrupted! let's evacuate it once to the
grave topic for further investigation.
          context.forward(key, value, "CorruptedEventSink");
      }
      if (isFieldBCorrupted(meta, value.getFieldB())) {
          // Antoher case of corruption, but maybe recoverable.
          context.forward(key, value, "CorruptedEventRecoveryProcessor");
      }

      for (Foo foo : event.getFoos()) {
          context.forward(key, buildMessage(meta, foo), "FooProcessor");
      }
  }
...
}
```


> Guozhang
>
> On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <ka...@gmail.com>
> wrote:
>
>> Hi Guozhang,
>>
>>
>>
>> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
>> > Hi Yuto,
>> >
>> > That is a good suggestion, the child index is not very intuitive from
>> > programmer's view and we can even consider replacing it with the
>> processor
>> > name instead of overloading it. Could you file a JIRA?
>> >
>>
>> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
>>
>> > Also I am wondering if you have looked at the higher-level Streams DSL,
>> and
>> > if yes could let me know what are the limitations from using that APIs in
>> > your case?
>> >
>>
>> Well, I read though high-level DSL interface but couldn't find an easy
>> way to handle output from Processors which could issue multiple
>> messages to arbitrary different destinations.
>> Maybe it could be done by doing something like below but it doesn't
>> look good. Please let me know if you have any idea to do this in
>> easier way.
>>
>> ```java
>> class KeyValueWithDestination {
>>     K key;
>>     V value;
>>     String destination;
>> }
>>
>> class DestinationPredicate implements Predicate<K,
>> KeyValueWithDestination> {
>>     String destination;
>>     @Override
>>     public boolean test(K key, KeyValueWithDestination value) {
>>         return value.destination.equals(destination);
>>     }
>> }
>>
>> String[] destTopics = {"topicA", "topicB", "topicC"};
>>
>> Predicate<K, KeyValueWithDestination>[] predicates =
>>         Arrays.stream(destTopics).map(DestinationPredicate::new)
>>                                  .toArray(Predicate<K,
>> KeyValueWithDestination>::new);
>>
>> branches = builder.stream("foo")
>>                   .map((key, value) -> processor.process(key, value)
>> /* => KeyValueWithDestination */)
>>                   .branch(predicates);
>>
>> for (int i = 0; i < branches.length; i++) {
>>     branches[i].to(destTopics[i]);
>> }
>> ```
>>
>>
>> > Guozhang
>> >
>> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
>> kawamuray.dadada@gmail.com>
>> > wrote:
>> >
>> >> When I tried to implement a task which does kinda dispatching to
>> >> downstream processors or sinks, looks like relying on
>> >> context.forward(K, V, int childIndex) is the only way now.
>> >> I have a question why this method implemented using childIndex(which
>> >> is just an index of children "List" that based on order of
>> >> builder.addProcessor() call) instead of child name(first argument to
>> >> add{Processor,Sink}).
>> >> I wanna ask what is the concrete use case of forward(K, V, int
>> >> childIndex) and is it makes sense to introduce another overload:
>> >> forward(K, V, String childName) for much handy use.
>> >> Currently I have a use-case like this in my mind:
>> >> ```
>> >> builder.addProcessor("DispatchProcess", new
>> >> DispatchProcessorSupplier(), "Source");
>> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
>> >> "DispatchProcess");
>> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
>> >> "DispatchProcess");
>> >>
>> >> // in process(key, value)
>> >> if ("key-for-A".equals(key)) {
>> >>     context.forward(key, value, "Process-A");
>> >> } else if ("key-for-B".equals(key)) {
>> >>     context.forward(key, value, "Process-B");
>> >> }
>> >> ```
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Yuto,

Is the destination topic embedded as part of the value in the original
"foo" topic? If yes could you just access that field directly instead of
mapping to a (key, value, destination) triplet?

Guozhang

On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <ka...@gmail.com>
wrote:

> Hi Guozhang,
>
>
>
> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> > Hi Yuto,
> >
> > That is a good suggestion, the child index is not very intuitive from
> > programmer's view and we can even consider replacing it with the
> processor
> > name instead of overloading it. Could you file a JIRA?
> >
>
> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
>
> > Also I am wondering if you have looked at the higher-level Streams DSL,
> and
> > if yes could let me know what are the limitations from using that APIs in
> > your case?
> >
>
> Well, I read though high-level DSL interface but couldn't find an easy
> way to handle output from Processors which could issue multiple
> messages to arbitrary different destinations.
> Maybe it could be done by doing something like below but it doesn't
> look good. Please let me know if you have any idea to do this in
> easier way.
>
> ```java
> class KeyValueWithDestination {
>     K key;
>     V value;
>     String destination;
> }
>
> class DestinationPredicate implements Predicate<K,
> KeyValueWithDestination> {
>     String destination;
>     @Override
>     public boolean test(K key, KeyValueWithDestination value) {
>         return value.destination.equals(destination);
>     }
> }
>
> String[] destTopics = {"topicA", "topicB", "topicC"};
>
> Predicate<K, KeyValueWithDestination>[] predicates =
>         Arrays.stream(destTopics).map(DestinationPredicate::new)
>                                  .toArray(Predicate<K,
> KeyValueWithDestination>::new);
>
> branches = builder.stream("foo")
>                   .map((key, value) -> processor.process(key, value)
> /* => KeyValueWithDestination */)
>                   .branch(predicates);
>
> for (int i = 0; i < branches.length; i++) {
>     branches[i].to(destTopics[i]);
> }
> ```
>
>
> > Guozhang
> >
> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> kawamuray.dadada@gmail.com>
> > wrote:
> >
> >> When I tried to implement a task which does kinda dispatching to
> >> downstream processors or sinks, looks like relying on
> >> context.forward(K, V, int childIndex) is the only way now.
> >> I have a question why this method implemented using childIndex(which
> >> is just an index of children "List" that based on order of
> >> builder.addProcessor() call) instead of child name(first argument to
> >> add{Processor,Sink}).
> >> I wanna ask what is the concrete use case of forward(K, V, int
> >> childIndex) and is it makes sense to introduce another overload:
> >> forward(K, V, String childName) for much handy use.
> >> Currently I have a use-case like this in my mind:
> >> ```
> >> builder.addProcessor("DispatchProcess", new
> >> DispatchProcessorSupplier(), "Source");
> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> >> "DispatchProcess");
> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> >> "DispatchProcess");
> >>
> >> // in process(key, value)
> >> if ("key-for-A".equals(key)) {
> >>     context.forward(key, value, "Process-A");
> >> } else if ("key-for-B".equals(key)) {
> >>     context.forward(key, value, "Process-B");
> >> }
> >> ```
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by Yuto KAWAMURA <ka...@gmail.com>.
Hi Guozhang,



2016-04-02 3:29 GMT+09:00 Guozhang Wang <wa...@gmail.com>:
> Hi Yuto,
>
> That is a good suggestion, the child index is not very intuitive from
> programmer's view and we can even consider replacing it with the processor
> name instead of overloading it. Could you file a JIRA?
>

Yep :) https://issues.apache.org/jira/browse/KAFKA-3497

> Also I am wondering if you have looked at the higher-level Streams DSL, and
> if yes could let me know what are the limitations from using that APIs in
> your case?
>

Well, I read though high-level DSL interface but couldn't find an easy
way to handle output from Processors which could issue multiple
messages to arbitrary different destinations.
Maybe it could be done by doing something like below but it doesn't
look good. Please let me know if you have any idea to do this in
easier way.

```java
class KeyValueWithDestination {
    K key;
    V value;
    String destination;
}

class DestinationPredicate implements Predicate<K, KeyValueWithDestination> {
    String destination;
    @Override
    public boolean test(K key, KeyValueWithDestination value) {
        return value.destination.equals(destination);
    }
}

String[] destTopics = {"topicA", "topicB", "topicC"};

Predicate<K, KeyValueWithDestination>[] predicates =
        Arrays.stream(destTopics).map(DestinationPredicate::new)
                                 .toArray(Predicate<K,
KeyValueWithDestination>::new);

branches = builder.stream("foo")
                  .map((key, value) -> processor.process(key, value)
/* => KeyValueWithDestination */)
                  .branch(predicates);

for (int i = 0; i < branches.length; i++) {
    branches[i].to(destTopics[i]);
}
```


> Guozhang
>
> On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <ka...@gmail.com>
> wrote:
>
>> When I tried to implement a task which does kinda dispatching to
>> downstream processors or sinks, looks like relying on
>> context.forward(K, V, int childIndex) is the only way now.
>> I have a question why this method implemented using childIndex(which
>> is just an index of children "List" that based on order of
>> builder.addProcessor() call) instead of child name(first argument to
>> add{Processor,Sink}).
>> I wanna ask what is the concrete use case of forward(K, V, int
>> childIndex) and is it makes sense to introduce another overload:
>> forward(K, V, String childName) for much handy use.
>> Currently I have a use-case like this in my mind:
>> ```
>> builder.addProcessor("DispatchProcess", new
>> DispatchProcessorSupplier(), "Source");
>> builder.addProcessor("Process-A", new ProcessorASupplier(),
>> "DispatchProcess");
>> builder.addProcessor("Process-B", new ProcessorBSupplier(),
>> "DispatchProcess");
>>
>> // in process(key, value)
>> if ("key-for-A".equals(key)) {
>>     context.forward(key, value, "Process-A");
>> } else if ("key-for-B".equals(key)) {
>>     context.forward(key, value, "Process-B");
>> }
>> ```
>>
>
>
>
> --
> -- Guozhang

Re: Kafka Streams: context.forward() with downstream name

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Yuto,

That is a good suggestion, the child index is not very intuitive from
programmer's view and we can even consider replacing it with the processor
name instead of overloading it. Could you file a JIRA?

Also I am wondering if you have looked at the higher-level Streams DSL, and
if yes could let me know what are the limitations from using that APIs in
your case?

Guozhang

On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <ka...@gmail.com>
wrote:

> When I tried to implement a task which does kinda dispatching to
> downstream processors or sinks, looks like relying on
> context.forward(K, V, int childIndex) is the only way now.
> I have a question why this method implemented using childIndex(which
> is just an index of children "List" that based on order of
> builder.addProcessor() call) instead of child name(first argument to
> add{Processor,Sink}).
> I wanna ask what is the concrete use case of forward(K, V, int
> childIndex) and is it makes sense to introduce another overload:
> forward(K, V, String childName) for much handy use.
> Currently I have a use-case like this in my mind:
> ```
> builder.addProcessor("DispatchProcess", new
> DispatchProcessorSupplier(), "Source");
> builder.addProcessor("Process-A", new ProcessorASupplier(),
> "DispatchProcess");
> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> "DispatchProcess");
>
> // in process(key, value)
> if ("key-for-A".equals(key)) {
>     context.forward(key, value, "Process-A");
> } else if ("key-for-B".equals(key)) {
>     context.forward(key, value, "Process-B");
> }
> ```
>



-- 
-- Guozhang